You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/07/12 04:52:46 UTC
[incubator-nemo] branch master updated: [NEMO-55] Handle NCS
Master-to-Executor RPC failures (#71)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 6804b98 [NEMO-55] Handle NCS Master-to-Executor RPC failures (#71)
6804b98 is described below
commit 6804b98be19c18f819c7b26355631df365d4533c
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Jul 12 13:52:44 2018 +0900
[NEMO-55] Handle NCS Master-to-Executor RPC failures (#71)
JIRA: [NEMO-55: Handle NCS Master-to-Executor RPC failures](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-55)
**Major changes:**
- Ignores NCS RPC failures assuming that executor failures will be handled by the FailedEvaluator event
- Introduces the concept of 'poisoned' resources for integration tests
- Improves the scheduling logic in the master, and exception handling logic in the data plane to pass the added integration test
**Minor changes to note:**
- Reorders some methods to group similar methods together
- Pretty logs, more helpful comments
**Tests for the changes:**
- AlternatingLeastSquareITCase#testPadoWithPoison : Fails the TRANSIENT resource every 1-3 seconds. On my mac the resource is failed and reacquired around 3~6 times before the job completes and the test passes.
**Other comments:**
- https://issues.apache.org/jira/browse/NEMO-140 is filed for more general handling of RPCs
- Will file issues soon for refactoring the data plane, and making it more easy to see how exceptions are handled
resolves [NEMO-55](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-55)
---
.../main/java/edu/snu/nemo/client/JobLauncher.java | 2 +-
.../src/main/java/edu/snu/nemo/common/dag/DAG.java | 2 +-
.../java/edu/snu/nemo/common/test/ArgBuilder.java | 5 +
conf/src/main/java/edu/snu/nemo/conf/JobConf.java | 55 ++---
.../beam/AlternatingLeastSquareITCase.java | 24 ++-
.../beam/policy/PadoPolicyParallelismTen.java | 44 ++++
.../beam_sample_poisoned_executor_resources.json | 13 ++
.../common/message/FailedMessageSender.java | 42 ++++
.../common/message/ncs/NcsMessageContext.java | 4 +-
.../common/message/ncs/NcsMessageEnvironment.java | 30 +--
.../snu/nemo/runtime/common/metric/TaskMetric.java | 2 +-
.../snu/nemo/runtime/common/plan/StageEdge.java | 5 +
.../edu/snu/nemo/runtime/common/plan/Task.java | 7 +-
.../snu/nemo/runtime/common/state/TaskState.java | 2 +
.../main/java/edu/snu/nemo/driver/NemoContext.java | 33 ++-
.../main/java/edu/snu/nemo/driver/NemoDriver.java | 4 +-
.../edu/snu/nemo/driver/UserApplicationRunner.java | 6 +-
.../edu/snu/nemo/runtime/executor/Executor.java | 5 +-
.../nemo/runtime/executor/MetricManagerWorker.java | 4 +-
.../executor/bytetransfer/ByteTransferContext.java | 6 +-
.../executor/bytetransfer/ByteTransport.java | 8 +-
.../bytetransfer/ClosableBlockingQueue.java | 16 +-
.../runtime/executor/data/BlockManagerWorker.java | 235 ++++++++++++---------
...ctionQueue.java => BlockTransferThrottler.java} | 13 +-
.../runtime/executor/datatransfer/InputReader.java | 9 +-
.../nemo/runtime/executor/task/TaskExecutor.java | 4 +-
...ueTest.java => BlockTransferThrottlerTest.java} | 25 ++-
.../nemo/runtime/master/BlockManagerMaster.java | 4 +-
.../edu/snu/nemo/runtime/master/BlockMetadata.java | 10 +-
.../snu/nemo/runtime/master/JobStateManager.java | 4 +-
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 14 +-
.../runtime/master/resource/ContainerManager.java | 22 +-
.../master/resource/ExecutorRepresenter.java | 18 --
.../master/resource/ResourceSpecification.java | 65 ++----
.../master/scheduler/BatchSingleJobScheduler.java | 46 ++--
.../runtime/master/scheduler/SchedulerRunner.java | 6 +-
.../nemo/runtime/master/ContainerManagerTest.java | 10 +-
.../{TaskRestartTest.java => TaskRetryTest.java} | 4 +-
38 files changed, 487 insertions(+), 321 deletions(-)
diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 6826174..0b80b83 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -245,7 +245,7 @@ public final class JobLauncher {
cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
- cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
+ cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class);
cl.registerShortNameOfClass(JobConf.FileDirectory.class);
cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class);
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index 46198f0..aeaea6f 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -411,7 +411,7 @@ public final class DAG<V extends Vertex, E extends Edge<V>> implements Serializa
try (final PrintWriter printWriter = new PrintWriter(file)) {
printWriter.println(toString());
printWriter.close();
- LOG.info(String.format("DAG JSON for %s is saved at %s"
+ LOG.debug(String.format("DAG JSON for %s is saved at %s"
+ " (Use https://service.jangho.kr/nemo-dag/ to visualize it.)", description, file.getPath()));
} catch (IOException e) {
LOG.warn(String.format("Cannot store JSON representation of %s to %s: %s",
diff --git a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
index aeb27a8..9b11b56 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
@@ -71,6 +71,11 @@ public final class ArgBuilder {
return this;
}
+ public ArgBuilder addMaxTaskAttempt(final int maxAttempt) {
+ args.add(Arrays.asList("-max_task_attempt", String.valueOf(maxAttempt)));
+ return this;
+ }
+
/**
* @param directory directory to save the DAG.
* @return builder with the DAG directory.
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index bed8099..8d016af 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -99,7 +99,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
public final class OptimizationPolicy implements Name<String> {
}
- //////////////////////////////// Runtime Configurations
+ //////////////////////////////// Runtime Master-Executor Common Configurations
/**
* Deploy mode.
@@ -109,6 +109,16 @@ public final class JobConf extends ConfigurationModuleBuilder {
}
/**
+ * The fraction of container memory not to use fo the JVM heap.
+ */
+ @NamedParameter(doc = "The fraction of the container memory not to use for the JVM heap", short_name = "heap_slack",
+ default_value = "0.3")
+ public final class JVMHeapSlack implements Name<Double> {
+ }
+
+ //////////////////////////////// Runtime Master Configurations
+
+ /**
* Nemo driver memory.
*/
@NamedParameter(doc = "Nemo driver memory", short_name = "driver_mem_mb", default_value = "1024")
@@ -116,6 +126,24 @@ public final class JobConf extends ConfigurationModuleBuilder {
}
/**
+<<<<<<< HEAD
+ * Max number of attempts for task scheduling.
+ */
+ @NamedParameter(doc = "Max number of task attempts", short_name = "max_task_attempt", default_value = "1")
+ public final class MaxTaskAttempt implements Name<Integer> {
+ }
+
+ //////////////////////////////// Runtime Executor Configurations
+
+ /**
+ * Used for fault-injected tests.
+ */
+ @NamedParameter(doc = "Executor crashes after expected time, does not crash when -1",
+ short_name = "executor_poison_sec", default_value = "-1")
+ public final class ExecutorPosionSec implements Name<Integer> {
+ }
+
+ /**
* Path to the JSON file that specifies bandwidth between locations.
*/
@NamedParameter(doc = "Path to the JSON file that specifies bandwidth between locations",
@@ -138,13 +166,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
public final class BandwidthJSONContents implements Name<String> {
}
- /**
- * The fraction of container memory not to use fo the JVM heap.
- */
- @NamedParameter(doc = "The fraction of the container memory not to use for the JVM heap", short_name = "heap_slack",
- default_value = "0.3")
- public final class JVMHeapSlack implements Name<Double> {
- }
/**
* Contents of the JSON file that specifies resource layout.
@@ -153,16 +174,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
public final class ExecutorJSONContents implements Name<String> {
}
- /**
- * Executor capacity.
- * Determines the number of Task 'slots' for each executor.
- * 1) Master's Task scheduler can use this number in scheduling.
- * (e.g., schedule Task to the executor currently with the maximum number of available slots)
- * 2) Executor's number of Task execution threads is set to this number.
- */
- @NamedParameter(doc = "Executor capacity", short_name = "executor_capacity", default_value = "1")
- public final class ExecutorCapacity implements Name<Integer> {
- }
+ //////////////////////////////// Runtime Data Plane Configurations
/**
* Number of I/O threads for block fetch requests from other executor.
@@ -181,13 +193,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
}
/**
- * Max number of attempts for task scheduling.
- */
- @NamedParameter(doc = "Max number of schedules", short_name = "max_schedule_attempt", default_value = "3")
- public final class MaxScheduleAttempt implements Name<Integer> {
- }
-
- /**
* The number of serialization threads for scheduling.
*/
@NamedParameter(doc = "Number of serialization thread for scheduling", short_name = "schedule_ser_thread",
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index e4e71ae..a5b1f39 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -18,8 +18,9 @@ package edu.snu.nemo.examples.beam;
import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.test.ArgBuilder;
import edu.snu.nemo.common.test.ExampleTestUtil;
-import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismTen;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public final class AlternatingLeastSquareITCase {
- private static final int TIMEOUT = 240000;
+ private static final int TIMEOUT = 240 * 1000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -41,7 +42,8 @@ public final class AlternatingLeastSquareITCase {
private static final String outputFileName = "sample_output_als";
private static final String output = fileBasePath + outputFileName;
private static final String testResourceFileName = "test_output_als";
- private static final String executorResourceFileName = fileBasePath + "beam_sample_executor_resources.json";
+ private static final String noPoisonResources = fileBasePath + "beam_sample_executor_resources.json";
+ private static final String poisonedResource = fileBasePath + "beam_sample_poisoned_executor_resources.json";
private static final String numFeatures = "10";
private static final String numIteration = "3";
private static final String lambda = "0.05";
@@ -49,7 +51,6 @@ public final class AlternatingLeastSquareITCase {
@Before
public void setUp() throws Exception {
builder = new ArgBuilder()
- .addResourceJson(executorResourceFileName)
.addUserMain(AlternatingLeastSquare.class.getCanonicalName())
.addUserArgs(input, numFeatures, numIteration, lambda, output);
}
@@ -64,18 +65,21 @@ public final class AlternatingLeastSquareITCase {
}
@Test (timeout = TIMEOUT)
- public void test() throws Exception {
+ public void testDefault() throws Exception {
JobLauncher.main(builder
- .addJobId(AlternatingLeastSquareITCase.class.getSimpleName())
- .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .addResourceJson(noPoisonResources)
+ .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_default")
+ .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
.build());
}
@Test (timeout = TIMEOUT)
- public void testPado() throws Exception {
+ public void testPadoWithPoison() throws Exception {
JobLauncher.main(builder
- .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado")
- .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
+ .addResourceJson(poisonedResource)
+ .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado_poisoned")
+ .addMaxTaskAttempt(Integer.MAX_VALUE)
+ .addOptimizationPolicy(PadoPolicyParallelismTen.class.getCanonicalName())
.build());
}
}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
new file mode 100644
index 0000000..53e82cc
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam.policy;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.compiler.optimizer.policy.PadoPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.List;
+
+/**
+ * A pado policy with fixed parallelism 10 for tests.
+ */
+public final class PadoPolicyParallelismTen implements Policy {
+ private final Policy policy;
+
+ public PadoPolicyParallelismTen() {
+ this.policy = PolicyTestUtil.overwriteParallelism(10, PadoPolicy.class.getCanonicalName());
+ }
+
+ @Override
+ public List<CompileTimePass> getCompileTimePasses() {
+ return this.policy.getCompileTimePasses();
+ }
+
+ @Override
+ public List<RuntimePass<?>> getRuntimePasses() {
+ return this.policy.getRuntimePasses();
+ }
+}
diff --git a/examples/resources/beam_sample_poisoned_executor_resources.json b/examples/resources/beam_sample_poisoned_executor_resources.json
new file mode 100644
index 0000000..b7614a9
--- /dev/null
+++ b/examples/resources/beam_sample_poisoned_executor_resources.json
@@ -0,0 +1,13 @@
+[
+ {
+ "type": "Transient",
+ "memory_mb": 512,
+ "capacity": 15,
+ "poison_sec": 2
+ },
+ {
+ "type": "Reserved",
+ "memory_mb": 512,
+ "capacity": 15
+ }
+]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
new file mode 100644
index 0000000..1218b1e
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.message;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A message sender that failed.
+ */
+public final class FailedMessageSender implements MessageSender<ControlMessage.Message> {
+ @Override
+ public void send(final ControlMessage.Message message) {
+ // Do nothing.
+ }
+
+ @Override
+ public CompletableFuture<ControlMessage.Message> request(final ControlMessage.Message message) {
+ final CompletableFuture<ControlMessage.Message> failed = new CompletableFuture<>();
+ failed.completeExceptionally(new Throwable("Failed Message Sender"));
+ return failed;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Do nothing.
+ }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
index ea478b7..beefcef 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
@@ -56,7 +56,9 @@ final class NcsMessageContext implements MessageContext {
// We do not call connection.close since NCS caches connection.
// Disabling Sonar warning (squid:S2095)
} catch (final NetworkException e) {
- throw new RuntimeException("Cannot connect to " + senderId, e);
+ // TODO #140: Properly classify and handle each RPC failure
+ // Not logging the stacktrace here, as it's not very useful.
+ LOG.error("NCS Exception");
}
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 4d07752..676cb15 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -27,10 +27,11 @@ import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.transport.LinkListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.net.SocketAddress;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,8 @@ import java.util.concurrent.Future;
* Message environment for NCS.
*/
public final class NcsMessageEnvironment implements MessageEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(NcsMessageEnvironment.class.getName());
+
private static final String NCS_CONN_FACTORY_ID = "NCS_CONN_FACTORY_ID";
private final NetworkConnectionService networkConnectionService;
@@ -62,7 +65,7 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
this.senderId = senderId;
this.replyFutureMap = new ReplyFutureMap<>();
this.listenerConcurrentMap = new ConcurrentHashMap<>();
- this.receiverToConnectionMap = new HashMap<>();
+ this.receiverToConnectionMap = new ConcurrentHashMap<>();
this.connectionFactory = networkConnectionService.registerConnectionFactory(
idFactory.getNewInstance(NCS_CONN_FACTORY_ID),
new ControlMessageCodec(),
@@ -87,17 +90,15 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
public <T> Future<MessageSender<T>> asyncConnect(final String receiverId, final String listenerId) {
try {
// If the connection toward the receiver exists already, reuses it.
- final Connection connection = receiverToConnectionMap.computeIfAbsent(receiverId, absentReceiverId -> {
- try {
- final Connection newConnection = connectionFactory.newConnection(idFactory.getNewInstance(absentReceiverId));
- newConnection.open();
- return newConnection;
- } catch (final NetworkException e) {
- throw new RuntimeException(e);
- }
- });
+ final Connection connection;
+ if (receiverToConnectionMap.containsKey(receiverId)) {
+ connection = receiverToConnectionMap.get(receiverId);
+ } else {
+ connection = connectionFactory.newConnection(idFactory.getNewInstance(receiverId));
+ connection.open();
+ }
return CompletableFuture.completedFuture((MessageSender) new NcsMessageSender(connection, replyFutureMap));
- } catch (final Exception e) {
+ } catch (final NetworkException e) {
final CompletableFuture<MessageSender<T>> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
@@ -166,8 +167,9 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
public void onException(final Throwable throwable,
final SocketAddress socketAddress,
final Message<ControlMessage.Message> messages) {
- final ControlMessage.Message controlMessage = extractSingleMessage(messages);
- throw new RuntimeException(controlMessage.toString(), throwable);
+ // TODO #140: Properly classify and handle each RPC failure
+ // Not logging the stacktrace here, as it's not very useful.
+ LOG.error("NCS Exception");
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
index db24f4d..aaa2337 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
@@ -112,7 +112,7 @@ public class TaskMetric implements StateMetric<TaskState.State> {
@Override
public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
- LOG.info("metric {} is just arrived!", metricField);
+ LOG.debug("metric {} is just arrived!", metricField);
switch (metricField) {
case "serializedReadBytes":
setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index ad62e68..5750c2a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -119,6 +119,11 @@ public final class StageEdge extends RuntimeEdge<Stage> {
return sb.toString();
}
+ @Override
+ public String toString() {
+ return propertiesToJSON();
+ }
+
/**
* @return the list between the task idx and key range to read.
*/
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index ce0a4ff..7663a8c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -18,7 +18,6 @@ package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
-import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
import java.util.List;
@@ -145,8 +144,10 @@ public final class Task implements Serializable {
sb.append(taskId);
sb.append(" / attempt: ");
sb.append(attemptIdx);
- sb.append(" / irDAG: ");
- sb.append(SerializationUtils.deserialize(serializedIRDag));
+ sb.append(" / incoming: ");
+ sb.append(taskIncomingEdges);
+ sb.append(" / outgoing: ");
+ sb.append(taskOutgoingEdges);
sb.append("/ exec props: ");
sb.append(getExecutionProperties());
return sb.toString();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index 74b808c..be83bf2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -57,6 +57,8 @@ public final class TaskState {
// From SHOULD_RETRY
stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.READY, "Ready to be retried");
+ stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.SHOULD_RETRY,
+ "SHOULD_RETRY can be caused by multiple reasons");
stateMachineBuilder.setInitialState(State.READY);
return stateMachineBuilder.build();
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
index d89b7e0..491e950 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
@@ -15,16 +15,20 @@
*/
package edu.snu.nemo.driver;
+import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.executor.Executor;
import org.apache.reef.annotations.audience.EvaluatorSide;
import org.apache.reef.evaluator.context.events.ContextStart;
import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
+import java.util.Random;
/**
* REEF Context for the Executor.
@@ -32,13 +36,21 @@ import javax.inject.Inject;
@EvaluatorSide
@Unit
public final class NemoContext {
-
private static final Logger LOG = LoggerFactory.getLogger(NemoContext.class.getName());
private final Executor executor;
+ private final Clock clock;
+ private final int crashTimeSec;
+
@Inject
- private NemoContext(final Executor executor) {
+ private NemoContext(final Executor executor,
+ @Parameter(JobConf.ExecutorPosionSec.class) final int crashTimeSec,
+ final Clock clock) {
this.executor = executor; // To make Tang instantiate Executor
+
+ // For poison handling
+ this.clock = clock;
+ this.crashTimeSec = crashTimeSec;
}
/**
@@ -48,6 +60,16 @@ public final class NemoContext {
@Override
public void onNext(final ContextStart contextStart) {
LOG.info("Context Started: Executor is now ready and listening for messages");
+
+ // For poison handling
+ if (crashTimeSec >= 0) {
+ final int crashTimeMs = addNoise(crashTimeSec * 1000);
+ LOG.info("Configured {} sec crash time, and actually crashing in {} ms (noise)", crashTimeSec, crashTimeMs);
+ clock.scheduleAlarm(crashTimeMs, (alarm) -> {
+ LOG.info("Poison: crashing immediately");
+ Runtime.getRuntime().halt(1); // Forces this JVM to shut down immediately.
+ });
+ }
}
}
@@ -55,10 +77,15 @@ public final class NemoContext {
* Called when the context is stopped.
*/
public final class ContextStopHandler implements EventHandler<ContextStop> {
-
@Override
public void onNext(final ContextStop contextStop) {
executor.terminate();
}
}
+
+ private int addNoise(final int number) {
+ final Random random = new Random();
+ final int fiftyPercent = random.nextInt((int) (number * (50.0 / 100.0)));
+ return random.nextBoolean() ? number + fiftyPercent : number - fiftyPercent; // -50% ~ +50%
+ }
}
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index f4493e8..4519116 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -23,6 +23,7 @@ import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageParameters;
import edu.snu.nemo.runtime.master.ClientRPC;
import edu.snu.nemo.runtime.master.RuntimeMaster;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.client.JobMessageObserver;
import org.apache.reef.driver.context.ActiveContext;
@@ -158,7 +159,8 @@ public final class NemoDriver {
*/
public void startSchedulingUserApplication(final String dagString) {
// Launch user application (with a new thread)
- final ExecutorService userApplicationRunnerThread = Executors.newSingleThreadExecutor();
+ final ExecutorService userApplicationRunnerThread = Executors.newSingleThreadExecutor(
+ new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
userApplicationRunnerThread.execute(() -> userApplicationRunner.run(dagString));
userApplicationRunnerThread.shutdown();
}
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 6ba615f..de2dcb0 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -58,7 +58,7 @@ public final class UserApplicationRunner {
@Inject
private UserApplicationRunner(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
@Parameter(JobConf.OptimizationPolicy.class) final String optimizationPolicy,
- @Parameter(JobConf.MaxScheduleAttempt.class) final int maxScheduleAttempt,
+ @Parameter(JobConf.MaxTaskAttempt.class) final int maxScheduleAttempt,
final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
final Injector injector,
final RuntimeMaster runtimeMaster) {
@@ -80,7 +80,7 @@ public final class UserApplicationRunner {
*/
public void run(final String dagString) {
try {
- LOG.info("##### Nemo Compiler #####");
+ LOG.info("##### Nemo Compiler Start #####");
final DAG<IRVertex, IREdge> dag = SerializationUtils.deserialize(Base64.getDecoder().decode(dagString));
dag.storeJSON(dagDirectory, "ir", "IR before optimization");
@@ -103,6 +103,8 @@ public final class UserApplicationRunner {
final PhysicalPlan physicalPlan = backend.compile(optimizedDAG);
+ LOG.info("##### Nemo Compiler Finish #####");
+
physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical execution plan by compiler");
// Execute!
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8852e0e..25f6224 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -37,6 +37,7 @@ import edu.snu.nemo.runtime.executor.data.SerializerManager;
import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
import edu.snu.nemo.runtime.executor.task.TaskExecutor;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
@@ -80,7 +81,9 @@ public final class Executor {
final DataTransferFactory dataTransferFactory,
final MetricManagerWorker metricMessageSender) {
this.executorId = executorId;
- this.executorService = Executors.newCachedThreadPool();
+ this.executorService = Executors.newCachedThreadPool(new BasicThreadFactory.Builder()
+ .namingPattern("TaskExecutor thread-%d")
+ .build());
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.serializerManager = serializerManager;
this.dataTransferFactory = dataTransferFactory;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 405ce4c..d8f3b41 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -70,10 +70,10 @@ public final class MetricManagerWorker implements MetricMessageSender {
final ControlMessage.MetricMsg.Builder metricMsgBuilder = ControlMessage.MetricMsg.newBuilder();
- LOG.info("MetricManagerWorker Size: {}", size);
+ LOG.debug("MetricManagerWorker Size: {}", size);
for (int i = 0; i < size; i++) {
final ControlMessage.Metric metric = metricMessageQueue.poll();
- LOG.info("MetricManagerWorker addMetric: {}, {}, {}", size, i, metric);
+ LOG.debug("MetricManagerWorker addMetric: {}, {}, {}", size, i, metric);
metricMsgBuilder.addMetric(i, metric);
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
index ec8977b..7cfc898 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
@@ -119,11 +119,7 @@ public abstract class ByteTransferContext {
return;
}
hasException = true;
- if (cause == null) {
- LOG.error(String.format("A channel exception set on %s", toString()));
- } else {
- LOG.error(String.format("A channel exception set on %s", toString()), cause);
- }
+ LOG.error(String.format("A channel exception set on %s", toString())); // Not logging throwable, which isn't useful
exception = cause;
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
index fa5116b..3e7aed6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
@@ -209,12 +209,8 @@ final class ByteTransport implements AutoCloseable {
LOG.debug("Connected to {}", remoteExecutorId);
return;
}
- // Failed to connect
- if (future.cause() == null) {
- LOG.error("Failed to connect to {}", remoteExecutorId);
- } else {
- LOG.error(String.format("Failed to connect to %s", remoteExecutorId), future.cause());
- }
+ // Failed to connect (Not logging the cause here, which is not very useful)
+ LOG.error("Failed to connect to {}", remoteExecutorId);
});
return connectFuture;
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
index 872b275..faf3ed1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
@@ -91,13 +91,15 @@ public final class ClosableBlockingQueue<T> implements AutoCloseable {
*/
@Nullable
public synchronized T take() throws InterruptedException {
+ while (queue.isEmpty() && !closed) {
+ wait();
+ }
+
+ // This should come after wait(), to be always checked on close
if (throwable != null) {
throw new RuntimeException(throwable);
}
- while (queue.isEmpty() && !closed) {
- wait();
- }
// retrieves and removes the head of the underlying collection, or return null if the queue is empty
return queue.poll();
}
@@ -110,13 +112,15 @@ public final class ClosableBlockingQueue<T> implements AutoCloseable {
*/
@Nullable
public synchronized T peek() throws InterruptedException {
+ while (queue.isEmpty() && !closed) {
+ wait();
+ }
+
+ // This should come after wait(), to be always checked on close
if (throwable != null) {
throw new RuntimeException(throwable);
}
- while (queue.isEmpty() && !closed) {
- wait();
- }
// retrieves the head of the underlying collection, or return null if the queue is empty
return queue.peek();
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index c3d41f7..d6a3da7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -61,18 +61,23 @@ public final class BlockManagerWorker {
private static final String REMOTE_FILE_STORE = "REMOTE_FILE_STORE";
private final String executorId;
+ private final SerializerManager serializerManager;
+
+ // Block stores
private final MemoryStore memoryStore;
private final SerializedMemoryStore serializedMemoryStore;
private final LocalFileStore localFileStore;
private final RemoteFileStore remoteFileStore;
+
+ // To-Master connections
private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
+ private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest;
+
+ // To-Executor connections
private final ByteTransfer byteTransfer;
- // Executor service to schedule I/O Runnable which can be done in background.
private final ExecutorService backgroundExecutorService;
private final Map<String, AtomicInteger> blockToRemainingRead;
- private final SerializerManager serializerManager;
- private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest;
- private final BlockTransferConnectionQueue blockTransferConnectionQueue;
+ private final BlockTransferThrottler blockTransferThrottler;
/**
* Constructor.
@@ -86,7 +91,7 @@ public final class BlockManagerWorker {
* @param persistentConnectionToMasterMap the connection map.
* @param byteTransfer the byte transfer.
* @param serializerManager the serializer manager.
- * @param blockTransferConnectionQueue restricts parallel connections
+ * @param blockTransferThrottler restricts parallel connections
*/
@Inject
private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
@@ -98,7 +103,7 @@ public final class BlockManagerWorker {
final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
final ByteTransfer byteTransfer,
final SerializerManager serializerManager,
- final BlockTransferConnectionQueue blockTransferConnectionQueue) {
+ final BlockTransferThrottler blockTransferThrottler) {
this.executorId = executorId;
this.memoryStore = memoryStore;
this.serializedMemoryStore = serializedMemoryStore;
@@ -110,9 +115,11 @@ public final class BlockManagerWorker {
this.blockToRemainingRead = new ConcurrentHashMap<>();
this.serializerManager = serializerManager;
this.pendingBlockLocationRequest = new ConcurrentHashMap<>();
- this.blockTransferConnectionQueue = blockTransferConnectionQueue;
+ this.blockTransferThrottler = blockTransferThrottler;
}
+ //////////////////////////////////////////////////////////// Main public methods
+
/**
* Creates a new block.
*
@@ -128,52 +135,6 @@ public final class BlockManagerWorker {
}
/**
- * Retrieves data from the stored block. A specific hash value range can be designated.
- *
- * @param blockId of the block.
- * @param blockStore for the data storage.
- * @param keyRange the key range descriptor.
- * @return the result data in the block.
- */
- private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(
- final String blockId,
- final InterTaskDataStoreProperty.Value blockStore,
- final KeyRange keyRange) {
- final BlockStore store = getBlockStore(blockStore);
-
- // First, try to fetch the block from local BlockStore.
- final Optional<Block> optionalBlock = store.readBlock(blockId);
-
- if (optionalBlock.isPresent()) {
- final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
- handleUsedData(blockStore, blockId);
-
- // Block resides in this evaluator!
- try {
- final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
- long numSerializedBytes = 0;
- long numEncodedBytes = 0;
- try {
- for (final NonSerializedPartition partition : partitions) {
- numSerializedBytes += partition.getNumSerializedBytes();
- numEncodedBytes += partition.getNumEncodedBytes();
- }
-
- return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
- numEncodedBytes));
- } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
- return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
- }
- } catch (final IOException e) {
- throw new BlockFetchException(e);
- }
- } else {
- // We don't have the block here...
- throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
- }
- }
-
- /**
* Inquiries the location of the specific block and routes the request to the local block manager worker
* or to the lower data plane.
* This can be invoked multiple times per blockId (maybe due to failures).
@@ -184,7 +145,7 @@ public final class BlockManagerWorker {
* @param keyRange the key range descriptor
* @return the {@link CompletableFuture} of the block.
*/
- public CompletableFuture<DataUtil.IteratorWithNumBytes> queryBlock(
+ public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(
final String blockId,
final String runtimeEdgeId,
final InterTaskDataStoreProperty.Value blockStore,
@@ -192,7 +153,10 @@ public final class BlockManagerWorker {
// Let's see if a remote worker has it
final CompletableFuture<ControlMessage.Message> blockLocationFuture =
pendingBlockLocationRequest.computeIfAbsent(blockId, blockIdToRequest -> {
- // Ask Master for the location
+ // Ask Master for the location.
+ // (IMPORTANT): This 'request' effectively blocks the TaskExecutor thread if the block is IN_PROGRESS.
+ // We use this property to make the receiver task of a 'push' edge to wait in an Executor for its input data
+ // to become available.
final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = persistentConnectionToMasterMap
.getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
ControlMessage.Message.newBuilder()
@@ -221,7 +185,7 @@ public final class BlockManagerWorker {
responseFromMaster.getBlockLocationInfoMsg();
if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
throw new BlockFetchException(new Throwable(
- "Block " + blockId + " not found both in any storage: "
+ "Block " + blockId + " location unknown: "
+ "The block state is " + blockLocationInfoMsg.getState()));
}
// This is the executor id that we wanted to know
@@ -236,11 +200,24 @@ public final class BlockManagerWorker {
.setRuntimeEdgeId(runtimeEdgeId)
.setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
.build();
- final CompletableFuture<ByteInputContext> contextFuture = blockTransferConnectionQueue
- .requestConnectPermission(runtimeEdgeId)
+ final CompletableFuture<ByteInputContext> contextFuture = blockTransferThrottler
+ .requestTransferPermission(runtimeEdgeId)
.thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray()));
- contextFuture.thenApply(context -> context.getCompletedFuture()
- .thenAccept(f -> blockTransferConnectionQueue.onConnectionFinished(runtimeEdgeId)));
+
+ // whenComplete() ensures that blockTransferThrottler.onTransferFinished() is always called,
+ // even on failures. Actual failure handling and Task retry will be done by DataFetcher.
+ contextFuture.whenComplete((connectionContext, connectionThrowable) -> {
+ if (connectionThrowable != null) {
+ // Something wrong with the connection. Notify blockTransferThrottler immediately.
+ blockTransferThrottler.onTransferFinished(runtimeEdgeId);
+ } else {
+ // Connection is okay. Notify blockTransferThrottler when the actual transfer is done, or fails.
+ connectionContext.getCompletedFuture().whenComplete((transferContext, transferThrowable) -> {
+ blockTransferThrottler.onTransferFinished(runtimeEdgeId);
+ });
+ }
+ });
+
return contextFuture
.thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
serializerManager.getSerializer(runtimeEdgeId)));
@@ -364,48 +341,7 @@ public final class BlockManagerWorker {
}
}
- /**
- * Handles used {@link edu.snu.nemo.runtime.executor.data.block.Block}.
- *
- * @param blockStore the store which contains the block.
- * @param blockId the ID of the block.
- */
- private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
- final String blockId) {
- final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
- if (remainingExpectedRead != null) {
- if (remainingExpectedRead.decrementAndGet() == 0) {
- // This block should be discarded.
- blockToRemainingRead.remove(blockId);
- backgroundExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- removeBlock(blockId, blockStore);
- }
- });
- }
- } // If null, just keep the data in the store.
- }
-
- /**
- * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
- * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
- * @return the block store.
- */
- private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
- switch (blockStore) {
- case MemoryStore:
- return memoryStore;
- case SerializedMemoryStore:
- return serializedMemoryStore;
- case LocalFileStore:
- return localFileStore;
- case GlusterFileStore:
- return remoteFileStore;
- default:
- throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported."));
- }
- }
+ //////////////////////////////////////////////////////////// Public methods for remote block I/O
/**
* Respond to a block request by another executor.
@@ -472,6 +408,101 @@ public final class BlockManagerWorker {
throw new IllegalStateException("No logic here");
}
+ //////////////////////////////////////////////////////////// Private helper methods
+
+ /**
+ * Retrieves data from the stored block. A specific hash value range can be designated.
+ *
+ * @param blockId of the block.
+ * @param blockStore for the data storage.
+ * @param keyRange the key range descriptor.
+ * @return the result data in the block.
+ */
+ private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(
+ final String blockId,
+ final InterTaskDataStoreProperty.Value blockStore,
+ final KeyRange keyRange) {
+ final BlockStore store = getBlockStore(blockStore);
+
+ // First, try to fetch the block from local BlockStore.
+ final Optional<Block> optionalBlock = store.readBlock(blockId);
+
+ if (optionalBlock.isPresent()) {
+ final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
+ handleUsedData(blockStore, blockId);
+
+ // Block resides in this evaluator!
+ try {
+ final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
+ long numSerializedBytes = 0;
+ long numEncodedBytes = 0;
+ try {
+ for (final NonSerializedPartition partition : partitions) {
+ numSerializedBytes += partition.getNumSerializedBytes();
+ numEncodedBytes += partition.getNumEncodedBytes();
+ }
+
+ return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
+ numEncodedBytes));
+ } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+ return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
+ }
+ } catch (final IOException e) {
+ throw new BlockFetchException(e);
+ }
+ } else {
+ // We don't have the block here...
+ throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
+ }
+ }
+
+
+ /**
+ * Handles used {@link edu.snu.nemo.runtime.executor.data.block.Block}.
+ *
+ * @param blockStore the store which contains the block.
+ * @param blockId the ID of the block.
+ */
+ private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
+ final String blockId) {
+ final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
+ if (remainingExpectedRead != null) {
+ if (remainingExpectedRead.decrementAndGet() == 0) {
+ // This block should be discarded.
+ blockToRemainingRead.remove(blockId);
+ backgroundExecutorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ removeBlock(blockId, blockStore);
+ }
+ });
+ }
+ } // If null, just keep the data in the store.
+ }
+
+ //////////////////////////////////////////////////////////// Converters
+
+ /**
+ * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
+ * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
+ * @return the block store.
+ */
+ private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
+ switch (blockStore) {
+ case MemoryStore:
+ return memoryStore;
+ case SerializedMemoryStore:
+ return serializedMemoryStore;
+ case LocalFileStore:
+ return localFileStore;
+ case GlusterFileStore:
+ return remoteFileStore;
+ default:
+ throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported."));
+ }
+ }
+
+
/**
* Decodes BlockStore property from protocol buffer.
* @param blockStore property from protocol buffer
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
similarity index 86%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
index 0870082..8d9f8a1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
@@ -17,6 +17,8 @@ package edu.snu.nemo.runtime.executor.data;
import edu.snu.nemo.conf.JobConf;
import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.ArrayDeque;
@@ -30,13 +32,14 @@ import java.util.concurrent.CompletableFuture;
* Executors can suffer from performance degradation and network-related exceptions when there are massive connections,
* especially under low network bandwidth or high volume of data.
*/
-public final class BlockTransferConnectionQueue {
+public final class BlockTransferThrottler {
+ private static final Logger LOG = LoggerFactory.getLogger(BlockTransferThrottler.class.getName());
private final Map<String, Integer> runtimeEdgeIdToNumCurrentConnections = new HashMap<>();
private final Map<String, Queue<CompletableFuture<Void>>> runtimeEdgeIdToPendingConnections = new HashMap<>();
private final int maxNum;
@Inject
- private BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNum) {
+ private BlockTransferThrottler(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNum) {
this.maxNum = maxNum;
}
@@ -45,7 +48,7 @@ public final class BlockTransferConnectionQueue {
* @param runtimeEdgeId the corresponding runtime edge id.
* @return a future that will be completed when the connection is granted.
*/
- public synchronized CompletableFuture<Void> requestConnectPermission(final String runtimeEdgeId) {
+ public synchronized CompletableFuture<Void> requestTransferPermission(final String runtimeEdgeId) {
runtimeEdgeIdToNumCurrentConnections.putIfAbsent(runtimeEdgeId, 0);
runtimeEdgeIdToPendingConnections.computeIfAbsent(runtimeEdgeId, id -> new ArrayDeque<>());
final int currentOutstandingConnections = runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
@@ -63,10 +66,10 @@ public final class BlockTransferConnectionQueue {
}
/**
- * Indicates the connection has finished.
+ * Indicates the transfer has finished.
* @param runtimeEdgeId the corresponding runtime edge id.
*/
- public synchronized void onConnectionFinished(final String runtimeEdgeId) {
+ public synchronized void onTransferFinished(final String runtimeEdgeId) {
final Queue<CompletableFuture<Void>> pendingConnections = runtimeEdgeIdToPendingConnections.get(runtimeEdgeId);
if (pendingConnections.size() == 0) {
// Just decrease the number of current connections.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 2b29b5f..e417c82 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -31,6 +31,8 @@ import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
import edu.snu.nemo.runtime.common.data.HashRange;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
import edu.snu.nemo.runtime.executor.data.DataUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,7 @@ import java.util.stream.StreamSupport;
* Represents the input data transfer to a task.
*/
public final class InputReader extends DataTransfer {
+ private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName());
private final int dstTaskIndex;
private final BlockManagerWorker blockManagerWorker;
@@ -89,7 +92,7 @@ public final class InputReader extends DataTransfer {
final String blockId = getBlockId(dstTaskIndex);
final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
= runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
- return blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
+ return blockManagerWorker.readBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
}
private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
@@ -100,7 +103,7 @@ public final class InputReader extends DataTransfer {
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
final String blockId = getBlockId(srcTaskIdx);
- futures.add(blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all()));
+ futures.add(blockManagerWorker.readBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all()));
}
return futures;
@@ -127,7 +130,7 @@ public final class InputReader extends DataTransfer {
for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
final String blockId = getBlockId(srcTaskIdx);
futures.add(
- blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), hashRangeToRead));
+ blockManagerWorker.readBlock(blockId, getId(), dataStoreProperty.get(), hashRangeToRead));
}
return futures;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 741bb30..bfcd040 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -325,7 +325,9 @@ public final class TaskExecutor {
}
private void handleMainOutputElement(final VertexHarness harness, final Object element) {
- harness.getWritersToChildrenTasks().forEach(outputWriter -> outputWriter.write(element));
+ harness.getWritersToChildrenTasks().forEach(outputWriter -> {
+ outputWriter.write(element);
+ });
if (harness.getSideInputChildren().size() > 0) {
sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element);
}
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
similarity index 73%
rename from runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
index eeb0031..2c815b7 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
@@ -16,7 +16,6 @@
package edu.snu.nemo.runtime.executor.data;
import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
@@ -29,22 +28,22 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertFalse;
-public final class BlockTransferConnectionQueueTest {
- private static final String THREAD_NAME = BlockTransferConnectionQueue.class.getSimpleName() + "-TestThread";
+public final class BlockTransferThrottlerTest {
+ private static final String THREAD_NAME = BlockTransferThrottler.class.getSimpleName() + "-TestThread";
private static final String RUNTIME_EDGE_0 = "RuntimeEdge0";
private static final int WAIT_TIME = 1000;
/**
- * Creates {@link BlockTransferConnectionQueue} for testing.
+ * Creates {@link BlockTransferThrottler} for testing.
* @param maxNum value for {@link JobConf.MaxNumDownloadsForARuntimeEdge} parameter.
- * @return {@link BlockTransferConnectionQueue} object created.
+ * @return {@link BlockTransferThrottler} object created.
*/
- private final BlockTransferConnectionQueue getQueue(final int maxNum) {
+ private final BlockTransferThrottler getQueue(final int maxNum) {
final Configuration conf = Tang.Factory.getTang().newConfigurationBuilder()
.bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, String.valueOf(maxNum))
.build();
final Injector injector = Tang.Factory.getTang().newInjector(conf);
try {
- return injector.getInstance(BlockTransferConnectionQueue.class);
+ return injector.getInstance(BlockTransferThrottler.class);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
@@ -54,13 +53,13 @@ public final class BlockTransferConnectionQueueTest {
public void test() throws InterruptedException, ExecutionException {
final ExecutorService executorService = Executors.newSingleThreadExecutor(
runnable -> new Thread(runnable, THREAD_NAME));
- final BlockTransferConnectionQueue queue = getQueue(3);
+ final BlockTransferThrottler queue = getQueue(3);
final Future executorServiceFuture = executorService.submit(() -> {
try {
- queue.requestConnectPermission(RUNTIME_EDGE_0).get();
- queue.requestConnectPermission(RUNTIME_EDGE_0).get();
- queue.requestConnectPermission(RUNTIME_EDGE_0).get();
- queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+ queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+ queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+ queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+ queue.requestTransferPermission(RUNTIME_EDGE_0).get();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -68,7 +67,7 @@ public final class BlockTransferConnectionQueueTest {
Thread.sleep(WAIT_TIME);
// We must have one pending connection request.
assertFalse(executorServiceFuture.isDone());
- queue.onConnectionFinished(RUNTIME_EDGE_0);
+ queue.onTransferFinished(RUNTIME_EDGE_0);
// The remaining request should be accepted before test timeout.
executorServiceFuture.get();
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index da7c1ef..f07fe1b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -139,7 +139,6 @@ public final class BlockManagerMaster {
*/
public Set<String> removeWorker(final String executorId) {
final Set<String> tasksToRecompute = new HashSet<>();
- LOG.warn("Worker {} is removed.", new Object[]{executorId});
final Lock writeLock = lock.writeLock();
writeLock.lock();
@@ -252,11 +251,10 @@ public final class BlockManagerMaster {
writeLock.lock();
try {
if (producerTaskIdToBlockIds.containsKey(failedTaskId)) {
- LOG.info("ProducerTask {} failed for a list of blocks:", failedTaskId);
producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
final BlockState.State state = (BlockState.State)
blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
- LOG.info("Partition lost: {}", blockId);
+ LOG.info("Block lost: {}", blockId);
onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, null);
});
} // else this task does not produce any block
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index c5957db..9e6d471 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -61,23 +61,23 @@ final class BlockMetadata {
switch (newState) {
case IN_PROGRESS:
- stateMachine.setState(newState);
break;
case NOT_AVAILABLE:
- LOG.info("Block {} lost in {}", new Object[]{blockId, location});
// Reset the block location and committer information.
locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
locationHandler = new BlockManagerMaster.BlockLocationRequestHandler(blockId);
- stateMachine.setState(newState);
break;
case AVAILABLE:
- assert (location != null);
+ if (location == null) {
+ throw new RuntimeException("Null location");
+ }
locationHandler.complete(location);
- stateMachine.setState(newState);
break;
default:
throw new UnsupportedOperationException(newState.toString());
}
+
+ stateMachine.setState(newState);
}
/**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index fd3e582..9ef2f8e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -249,7 +249,7 @@ public final class JobStateManager {
if (newState == JobState.State.EXECUTING) {
LOG.debug("Executing Job ID {}...", this.jobId);
} else if (newState == JobState.State.COMPLETE || newState == JobState.State.FAILED) {
- LOG.info("Job ID {} {}!", new Object[]{jobId, newState});
+ LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
// Awake all threads waiting the finish of this job.
finishLock.lock();
@@ -354,7 +354,7 @@ public final class JobStateManager {
file.getParentFile().mkdirs();
try (final PrintWriter printWriter = new PrintWriter(file)) {
printWriter.println(toStringWithPhysicalPlan());
- LOG.info(String.format("JSON representation of job state for %s(%s) was saved to %s",
+ LOG.debug(String.format("JSON representation of job state for %s(%s) was saved to %s",
jobId, suffix, file.getPath()));
} catch (final IOException e) {
LOG.warn(String.format("Cannot store JSON representation of job state for %s(%s) to %s: %s",
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 43e46cc..806f000 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -108,7 +108,8 @@ public final class RuntimeMaster {
// since the processing logic in master takes a very short amount of time
// compared to the job completion times of executed jobs
// and keeping it single threaded removes the complexity of multi-thread synchronization.
- this.runtimeMasterThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster"));
+ this.runtimeMasterThread =
+ Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
this.scheduler = scheduler;
this.containerManager = containerManager;
this.blockManagerMaster = blockManagerMaster;
@@ -218,13 +219,13 @@ public final class RuntimeMaster {
for (int i = 0; i < jsonRootNode.size(); i++) {
final TreeNode resourceNode = jsonRootNode.get(i);
- final ResourceSpecification.Builder builder = ResourceSpecification.newBuilder();
- builder.setContainerType(resourceNode.get("type").traverse().nextTextValue());
- builder.setMemory(resourceNode.get("memory_mb").traverse().getIntValue());
- builder.setCapacity(resourceNode.get("capacity").traverse().getIntValue());
+ final String type = resourceNode.get("type").traverse().nextTextValue();
+ final int memory = resourceNode.get("memory_mb").traverse().getIntValue();
+ final int capacity = resourceNode.get("capacity").traverse().getIntValue();
final int executorNum = resourceNode.path("num").traverse().nextIntValue(1);
+ final int poisonSec = resourceNode.path("poison_sec").traverse().nextIntValue(-1);
resourceRequestCount.getAndAdd(executorNum);
- containerManager.requestContainer(executorNum, builder.build());
+ containerManager.requestContainer(executorNum, new ResourceSpecification(type, capacity, memory, poisonSec));
}
metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
} catch (final Exception e) {
@@ -288,7 +289,6 @@ public final class RuntimeMaster {
*/
public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
runtimeMasterThread.execute(() -> {
- LOG.info("onExecutorFailed: {}", failedEvaluator.getId());
metricCountDownLatch.countDown();
// Note that getFailedContextList() can be empty if the failure occurred
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
index 32d4bd9..a099a11 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.master.resource;
import edu.snu.nemo.common.exception.ContainerException;
import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.message.FailedMessageSender;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageSender;
import org.apache.reef.annotations.audience.DriverSide;
@@ -25,6 +26,8 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +36,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -146,7 +150,12 @@ public final class ContainerManager {
+ ") allocated, will be used for [" + executorId + "]");
pendingContextIdToResourceSpec.put(executorId, resourceSpecification);
- allocatedContainer.submitContext(executorConfiguration);
+ // Poison handling
+ final Configuration poisonConfiguration = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(JobConf.ExecutorPosionSec.class, String.valueOf(resourceSpecification.getPoisonSec()))
+ .build();
+
+ allocatedContainer.submitContext(Configurations.merge(executorConfiguration, poisonConfiguration));
}
/**
@@ -165,16 +174,16 @@ public final class ContainerManager {
// We set contextId = executorId in NemoDriver when we generate executor configuration.
final String executorId = activeContext.getId();
-
final ResourceSpecification resourceSpec = pendingContextIdToResourceSpec.remove(executorId);
// Connect to the executor and initiate Master side's executor representation.
- final MessageSender messageSender;
+ MessageSender messageSender;
try {
messageSender =
messageEnvironment.asyncConnect(executorId, MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID).get();
- } catch (final Exception e) {
- throw new RuntimeException(e);
+ } catch (final InterruptedException | ExecutionException e) {
+ // TODO #140: Properly classify and handle each RPC failure
+ messageSender = new FailedMessageSender();
}
// Create the executor representation.
@@ -182,9 +191,8 @@ public final class ContainerManager {
new ExecutorRepresenter(executorId, resourceSpec, messageSender, activeContext, serializationExecutorService,
activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
- LOG.info("{} is up and running at {}", executorId, executorRepresenter.getNodeName());
-
requestLatchByResourceSpecId.get(resourceSpec.getResourceSpecId()).countDown();
+
return Optional.of(executorRepresenter);
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e431074..e0df766 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -170,24 +170,6 @@ public final class ExecutorRepresenter {
return runningTasks;
}
- public Map<Task, Integer> getRunningTaskToAttempt() {
- return runningTaskToAttempt;
- }
-
- /**
- * @return set of ids of Tasks that have been failed in this exeuctor
-
- public Set<String> getFailedTasks() {
- return failedTasks;
- }
-
- /**
- * @return set of ids of Tasks that have been completed in this executor
- */
- public Set<Task> getCompleteTasks() {
- return completeTasks;
- }
-
/**
* @return the executor id
*/
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
index 6a2b2b8..173a477 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
@@ -26,14 +26,23 @@ public final class ResourceSpecification {
private final String containerType;
private final int capacity;
private final int memory;
+ private final int poisonSec; // -1 if this resources is not poisoned
public ResourceSpecification(final String containerType,
final int capacity,
final int memory) {
+ this(containerType, capacity, memory, -1);
+ }
+
+ public ResourceSpecification(final String containerType,
+ final int capacity,
+ final int memory,
+ final int poisonSec) {
this.resourceSpecId = RuntimeIdGenerator.generateResourceSpecId();
this.containerType = containerType;
this.capacity = capacity;
this.memory = memory;
+ this.poisonSec = poisonSec;
}
/**
@@ -62,58 +71,10 @@ public final class ResourceSpecification {
}
/**
- * @return {@link Builder} for {@link ResourceSpecification}.
+ * @return -1 if this resource is not poisoned. (for all other normal cases)
+ * >= 0 the expected time to failure by poison. (for fault-handling tests)
*/
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * A Builder class for {@link ResourceSpecification}.
- */
- public static final class Builder {
- private String containerType;
- private Integer capacity;
- private Integer memory;
-
- private Builder() {
- }
-
- /**
- * @param inputContainerType the container type
- * @return {@link Builder} object.
- */
- public Builder setContainerType(final String inputContainerType) {
- this.containerType = inputContainerType;
- return this;
- }
-
- /**
- * @param inputCapacity the number of Tasks that can be run in this container
- * @return {@link Builder} object.
- */
- public Builder setCapacity(final int inputCapacity) {
- this.capacity = inputCapacity;
- return this;
- }
-
- /**
- * @param inputMemory the size of the memory allocated, in megabytes
- * @return {@link Builder} object.
- */
- public Builder setMemory(final int inputMemory) {
- this.memory = inputMemory;
- return this;
- }
-
- /**
- * @return the {@link ResourceSpecification} object that has been built
- */
- public ResourceSpecification build() {
- assert (containerType != null);
- assert (capacity != null);
- assert (memory != null);
- return new ResourceSpecification(containerType, capacity, memory);
- }
+ public int getPoisonSec() {
+ return poisonSec;
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 880c8d0..531fced 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -192,7 +192,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
}
break;
case SHOULD_RETRY:
- // Retry the failed task
+ // Do retry
doSchedule();
break;
default:
@@ -224,13 +224,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
@Override
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
- LOG.info("{} added", executorRepresenter.getExecutorId());
+ LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
executorRegistry.registerExecutor(executorRepresenter);
schedulerRunner.onExecutorSlotAvailable();
}
@Override
public void onExecutorRemoved(final String executorId) {
+ LOG.info("{} removed", executorId);
blockManagerMaster.removeWorker(executorId);
// These are tasks that were running at the time of executor removal.
@@ -240,14 +241,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
});
- // We need to retry the interrupted tasks, and also recover the tasks' missing input blocks if needed.
- final Set<String> tasksToReExecute =
- Sets.union(interruptedTasks, recursivelyGetParentTasksForLostBlocks(interruptedTasks));
-
- // Report SHOULD_RETRY tasks so they can be re-scheduled
- LOG.info("{} removed: {} will be retried", executorId, tasksToReExecute);
- tasksToReExecute.forEach(
- taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+ // Retry the interrupted tasks (and required parents)
+ retryTasksAndRequiredParents(interruptedTasks);
// Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
doSchedule();
@@ -263,8 +258,11 @@ public final class BatchSingleJobScheduler implements Scheduler {
/**
* The main entry point for task scheduling.
- * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects,
- * and integrate well with {@link PendingTaskCollectionPointer} and {@link SchedulerRunner}.
+ * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
+ *
+ * These are the reasons why.
+ * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
+ * - We make {@link SchedulerRunner} run only tasks that are READY.
*/
private void doSchedule() {
final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
@@ -275,8 +273,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
.flatMap(stage -> selectSchedulableTasks(stage).stream())
.collect(Collectors.toList());
- LOG.info("Attempting to schedule {} in the same ScheduleGroup",
- tasksToSchedule.stream().map(Task::getTaskId).collect(Collectors.toList()));
+ // We prefer (but not guarantee) to schedule the 'receiving' tasks first,
+ // assuming that tasks within a ScheduleGroup are connected with 'push' edges.
+ Collections.reverse(tasksToSchedule);
+
+ LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+ .map(Task::getTaskId)
+ .map(RuntimeIdGenerator::getStageIdFromTaskId)
+ .collect(Collectors.toSet()));
// Set the pointer to the schedulable tasks.
pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
@@ -289,6 +293,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
}
private Optional<List<Stage>> selectEarliestSchedulableGroup() {
+ if (sortedScheduleGroups == null) {
+ return Optional.empty();
+ }
+
return sortedScheduleGroups.stream()
.filter(scheduleGroup -> scheduleGroup.stream()
.map(Stage::getId)
@@ -431,10 +439,20 @@ public final class BatchSingleJobScheduler implements Scheduler {
default:
throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
}
+
+ retryTasksAndRequiredParents(Collections.singleton(taskId));
}
////////////////////////////////////////////////////////////////////// Helper methods
+ private void retryTasksAndRequiredParents(final Set<String> tasks) {
+ final Set<String> requiredParents = recursivelyGetParentTasksForLostBlocks(tasks);
+ final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
+ LOG.info("Will be retried: {}", tasksToRetry);
+ tasksToRetry.forEach(
+ taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+ }
+
private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
if (children.isEmpty()) {
return Collections.emptySet();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index caf0d40..0b0524e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -66,7 +66,8 @@ public final class SchedulerRunner {
final ExecutorRegistry executorRegistry) {
this.jobStateManagers = new HashMap<>();
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
- this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
+ this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
+ new Thread(runnable, "SchedulerRunner thread"));
this.isSchedulerRunning = false;
this.isTerminated = false;
this.executorRegistry = executorRegistry;
@@ -114,7 +115,6 @@ public final class SchedulerRunner {
continue;
}
- LOG.debug("Trying to schedule {}...", task.getTaskId());
executorRegistry.viewExecutors(executors -> {
final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
task.getExecutionProperties().forEachProperties(property -> {
@@ -132,6 +132,8 @@ public final class SchedulerRunner {
// update metadata first
jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
+ LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
+
// send the task
selectedExecutor.onTaskScheduled(task);
} else {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
index 18dd6cf..441dd31 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
@@ -107,7 +107,7 @@ public final class ContainerManagerTest {
containerManager.onContainerAllocated(
executorId,
createMockEvaluator(evaluatorId, descriptor),
- mock(Configuration.class));
+ createMockConfiguration());
final ExecutorRepresenter executorRepresenter =
containerManager.onContainerLaunched(createMockContext(executorId, descriptor)).get();
assertEquals(spec.getContainerType(), executorRepresenter.getContainerType());
@@ -125,7 +125,7 @@ public final class ContainerManagerTest {
containerManager.onContainerAllocated(
getExecutorId(),
createMockEvaluator(evaluatorId, createDescriptor(RESOURCE_SPEC_A)),
- mock(Configuration.class));
+ createMockConfiguration());
assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId));
}
@@ -139,7 +139,7 @@ public final class ContainerManagerTest {
containerManager.onContainerAllocated(
executorId,
createMockEvaluator(evaluatorId, descriptor),
- mock(Configuration.class));
+ createMockConfiguration());
containerManager.onContainerLaunched(createMockContext(executorId, descriptor));
assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId));
}
@@ -170,4 +170,8 @@ public final class ContainerManagerTest {
when(mockedContext.getEvaluatorDescriptor()).thenReturn(descriptor);
return mockedContext;
}
+
+ private Configuration createMockConfiguration() {
+ return Tang.Factory.getTang().newConfigurationBuilder().build();
+ }
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
similarity index 99%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index 7064a23..4b6af1a 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -59,10 +59,10 @@ import static org.mockito.Mockito.mock;
@RunWith(PowerMockRunner.class)
@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, SchedulingConstraintRegistry.class,
PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
-public final class TaskRestartTest {
+public final class TaskRetryTest {
@Rule public TestName testName = new TestName();
- private static final Logger LOG = LoggerFactory.getLogger(TaskRestartTest.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(TaskRetryTest.class.getName());
private static final AtomicInteger ID_OFFSET = new AtomicInteger(1);
private Random random;