You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/08/09 04:56:47 UTC

[GitHub] sanha closed pull request #94: [NEMO-152] Support Multiple DAGs Execution in a Single User Program

sanha closed pull request #94: [NEMO-152] Support Multiple DAGs Execution in a Single User Program
URL: https://github.com/apache/incubator-nemo/pull/94
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 582f30acb..f41146b64 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -44,15 +44,15 @@ def stateToColor(state):
     except:
         return 'white'
 
-class JobState:
+class PlanState:
     def __init__(self, data):
-        self.id = data['jobId']
+        self.id = data['planId']
         self.stages = {}
         for stage in data['stages']:
             self.stages[stage['id']] = StageState(stage)
     @classmethod
     def empty(cls):
-        return cls({'jobId': None, 'stages': []})
+        return cls({'planId': None, 'stages': []})
     def get(self, id):
         try:
             return self.stages[id]
@@ -96,11 +96,11 @@ class DAG:
     A class for converting DAG to Graphviz representation.
     JSON representation should be formatted like what toString method in DAG.java does.
     '''
-    def __init__(self, dag, jobState):
+    def __init__(self, dag, planState):
         self.vertices = {}
         self.edges = []
         for vertex in dag['vertices']:
-            self.vertices[vertex['id']] = Vertex(vertex['id'], vertex['properties'], jobState.get(vertex['id']))
+            self.vertices[vertex['id']] = Vertex(vertex['id'], vertex['properties'], planState.get(vertex['id']))
         for edge in dag['edges']:
             self.edges.append(Edge(self.vertices[edge['src']], self.vertices[edge['dst']], edge['properties']))
     @property
@@ -178,7 +178,7 @@ def logicalEnd(self):
 class LoopVertex:
     def __init__(self, id, properties):
         self.id = id
-        self.dag = DAG(properties['DAG'], JobState.empty())
+        self.dag = DAG(properties['DAG'], PlanState.empty())
         self.remaining_iteration = properties['remainingIteration']
         self.executionProperties = properties['executionProperties']
         self.incoming = properties['dagIncomingEdges']
@@ -217,7 +217,7 @@ class Stage:
     def __init__(self, id, properties, state):
         self.id = id
         self.properties = properties
-        self.stageDAG = DAG(properties['irDag'], JobState.empty())
+        self.stageDAG = DAG(properties['irDag'], PlanState.empty())
         self.idx = getIdx()
         self.state = state
         self.executionProperties = self.properties['executionProperties']
@@ -316,9 +316,9 @@ def dot(self):
 
 def jsonToDot(jsonDict):
     try:
-        dag = DAG(jsonDict['dag'], JobState(jsonDict['jobState']))
+        dag = DAG(jsonDict['dag'], PlanState(jsonDict['planState']))
     except:
-        dag = DAG(jsonDict, JobState.empty())
+        dag = DAG(jsonDict, PlanState.empty())
     return 'digraph dag {compound=true; nodesep=1.0; forcelabels=true;' + dag.dot + '}'
 
 if __name__ == "__main__":
diff --git a/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java b/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
index 2d27647ea..b31807ad3 100644
--- a/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
+++ b/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -24,12 +24,12 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * A request endpoint in client side of a job.
+ * A request endpoint in client side of a plan.
  */
 public abstract class ClientEndpoint {
 
   /**
-   * The request endpoint in driver side of the job.
+   * The request endpoint in driver side of the plan.
    */
   private final AtomicReference<DriverEndpoint> driverEndpoint;
 
@@ -41,13 +41,13 @@
   private static final long DEFAULT_DRIVER_WAIT_IN_MILLIS = 100;
 
   /**
-   * A {@link StateTranslator} for this job.
+   * A {@link StateTranslator} for this plan.
    */
   private final StateTranslator stateTranslator;
 
   /**
    * Constructor.
-   * @param stateTranslator translator to translate between the state of job and corresponding.
+   * @param stateTranslator translator to translate between the state of plan and corresponding.
    */
   public ClientEndpoint(final StateTranslator stateTranslator) {
     this.driverEndpoint = new AtomicReference<>();
@@ -57,7 +57,7 @@ public ClientEndpoint(final StateTranslator stateTranslator) {
   }
 
   /**
-   * Connect the driver endpoint of this job.
+   * Connect the driver endpoint of this plan.
    * This method will be called by {@link DriverEndpoint}.
    *
    * @param dep connected with this client.
@@ -122,15 +122,15 @@ private boolean waitUntilConnected() {
   }
 
   /**
-   * Get the current state of the running job.
+   * Get the current state of the running plan.
    *
-   * @return the current state of the running job.
+   * @return the current state of the running plan.
    */
-  public final synchronized Enum getJobState() {
+  public final synchronized Enum getPlanState() {
     if (driverEndpoint.get() != null) {
       return stateTranslator.translateState(driverEndpoint.get().getState());
     } else {
-      return stateTranslator.translateState(JobState.State.READY);
+      return stateTranslator.translateState(PlanState.State.READY);
     }
   }
 
@@ -161,7 +161,7 @@ public final Enum waitUntilJobFinish(final long timeout,
         return stateTranslator.translateState(driverEndpoint.get().
             waitUntilFinish(timeout - unit.convert(consumedTime, TimeUnit.NANOSECONDS), unit));
       } else {
-        return JobState.State.READY;
+        return PlanState.State.READY;
       }
     }
   }
@@ -181,7 +181,7 @@ public final Enum waitUntilJobFinish() {
       if (driverIsConnected) {
         return stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
       } else {
-        return JobState.State.READY;
+        return PlanState.State.READY;
       }
     }
   }
diff --git a/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java b/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
index fc33eeae2..4338877b9 100644
--- a/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
+++ b/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
@@ -15,67 +15,67 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.runtime.common.state.JobState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.common.state.PlanState;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 
 import java.util.concurrent.TimeUnit;
 
 /**
- * A request endpoint in driver side of a job.
+ * A request endpoint in driver side of a plan.
  */
 public final class DriverEndpoint {
 
   /**
-   * The {@link JobStateManager} of the running job.
+   * The {@link PlanStateManager} of the running plan.
    */
-  private final JobStateManager jobStateManager;
+  private final PlanStateManager planStateManager;
 
   /**
-   * The {@link ClientEndpoint} of the job.
+   * The {@link ClientEndpoint} of the plan.
    */
   private final ClientEndpoint clientEndpoint;
 
   /**
    * Construct an endpoint in driver side.
    * This method will be called by {@link ClientEndpoint}.
-   * @param jobStateManager of running job.
-   * @param clientEndpoint of running job.
+   * @param planStateManager of running plan.
+   * @param clientEndpoint of running plan.
    */
-  public DriverEndpoint(final JobStateManager jobStateManager,
+  public DriverEndpoint(final PlanStateManager planStateManager,
                         final ClientEndpoint clientEndpoint) {
-    this.jobStateManager = jobStateManager;
+    this.planStateManager = planStateManager;
     this.clientEndpoint = clientEndpoint;
     clientEndpoint.connectDriver(this);
   }
 
   /**
-   * Get the current state of the running job.
+   * Get the current state of the running plan.
    * This method will be called by {@link ClientEndpoint}.
-   * @return the current state of the running job.
+   * @return the current state of the running plan.
    */
-  JobState.State getState() {
-    return jobStateManager.getJobState();
+  PlanState.State getState() {
+    return planStateManager.getPlanState();
   }
 
   /**
-   * Wait for this job to be finished and return the final state.
+   * Wait for this plan to be finished and return the final state.
    * It wait for at most the given time.
    * This method will be called by {@link ClientEndpoint}.
    * @param timeout of waiting.
    * @param unit of the timeout.
-   * @return the final state of this job.
+   * @return the final state of this plan.
    */
-  JobState.State waitUntilFinish(final long timeout,
-                                 final TimeUnit unit) {
-    return jobStateManager.waitUntilFinish(timeout, unit);
+  PlanState.State waitUntilFinish(final long timeout,
+                                  final TimeUnit unit) {
+    return planStateManager.waitUntilFinish(timeout, unit);
   }
 
   /**
-   * Wait for this job to be finished and return the final state.
+   * Wait for this plan to be finished and return the final state.
    * This method will be called by {@link ClientEndpoint}.
-   * @return the final state of this job.
+   * @return the final state of this plan.
    */
-  JobState.State waitUntilFinish() {
-    return jobStateManager.waitUntilFinish();
+  PlanState.State waitUntilFinish() {
+    return planStateManager.waitUntilFinish();
   }
 }
diff --git a/client/src/main/java/edu/snu/nemo/client/StateTranslator.java b/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
index 35082d752..287a0a070 100644
--- a/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
+++ b/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
@@ -15,19 +15,18 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 /**
- * A class provides the translation between the state of job and corresponding
- * {@link ClientEndpoint}.
+ * A class provides the translation between the state of plan and corresponding {@link ClientEndpoint}.
  */
 public interface StateTranslator {
 
   /**
-   * Translate a job state of nemo to a corresponding client endpoint state.
+   * Translate a plan state of nemo to a corresponding client endpoint state.
    *
-   * @param jobState to translate.
+   * @param planState to translate.
    * @return the translated state.
    */
-  Enum translateState(final JobState.State jobState);
+  Enum translateState(final PlanState.State planState);
 }
diff --git a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
index 3f0d1a0cd..744a992f5 100644
--- a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
+++ b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
@@ -16,10 +16,10 @@
 package edu.snu.nemo.client;
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,32 +50,32 @@ public void testState() throws Exception {
     final StateTranslator stateTranslator = mock(StateTranslator.class);
     when(stateTranslator.translateState(any())).then(state -> state.getArgument(0));
     final ClientEndpoint clientEndpoint = new TestClientEndpoint(stateTranslator);
-    assertEquals(clientEndpoint.getJobState(), JobState.State.READY);
+    assertEquals(clientEndpoint.getPlanState(), PlanState.State.READY);
 
     // Wait for connection but not connected.
-    assertEquals(clientEndpoint.waitUntilJobFinish(100, TimeUnit.MILLISECONDS), JobState.State.READY);
+    assertEquals(clientEndpoint.waitUntilJobFinish(100, TimeUnit.MILLISECONDS), PlanState.State.READY);
 
-    // Create a JobStateManager of a dag and create a DriverEndpoint with it.
+    // Create a PlanStateManager of a dag and create a DriverEndpoint with it.
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    final PlanStateManager planStateManager =
+        new PlanStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
-    final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
+    final DriverEndpoint driverEndpoint = new DriverEndpoint(planStateManager, clientEndpoint);
 
     // Check the current state.
-    assertEquals(clientEndpoint.getJobState(), JobState.State.EXECUTING);
+    assertEquals(clientEndpoint.getPlanState(), PlanState.State.EXECUTING);
 
     // Wait for the job to finish but not finished
-    assertEquals(clientEndpoint.waitUntilJobFinish(100, TimeUnit.MILLISECONDS), JobState.State.EXECUTING);
+    assertEquals(clientEndpoint.waitUntilJobFinish(100, TimeUnit.MILLISECONDS), PlanState.State.EXECUTING);
 
     // Check finish.
     final List<String> tasks = physicalPlan.getStageDAG().getTopologicalSort().stream()
         .flatMap(stage -> stage.getTaskIds().stream())
         .collect(Collectors.toList());
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING));
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE));
-    assertEquals(JobState.State.COMPLETE, clientEndpoint.waitUntilJobFinish());
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING));
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE));
+    assertEquals(PlanState.State.COMPLETE, clientEndpoint.waitUntilJobFinish());
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index aeaea6fbb..593a99b54 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -27,7 +27,6 @@
 import java.io.Serializable;
 import java.util.*;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -83,17 +82,6 @@ public DAG(final Set<V> vertices,
     loopStackDepthMap.forEach(((v, integer) -> this.loopStackDepthMap.put(v.getId(), integer)));
   }
 
-  /**
-   * Converts a DAG into another DAG according to a function.
-   * @param function to apply when converting a DAG to another.
-   * @param <V2> the converted DAG's vertex type.
-   * @param <E2> the converted DAG's edge type.
-   * @return the converted DAG.
-   */
-  public <V2 extends Vertex, E2 extends Edge<V2>> DAG<V2, E2> convert(final Function<DAG<V, E>, DAG<V2, E2>> function) {
-    return function.apply(this);
-  }
-
   /**
    * Retrieves the vertex given its ID.
    * @param id of the vertex to retrieve
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 465fb7709..3e933a2e5 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -60,8 +60,7 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG) throws Exception
    */
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
-    final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
-    final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
-    return physicalPlan;
+    final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
+    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
index 0212c20aa..3f420e891 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.compiler.frontend.beam;
 
 import edu.snu.nemo.client.StateTranslator;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 import static org.apache.beam.sdk.PipelineResult.State.*;
 
@@ -33,7 +33,7 @@
    * @return the translated state.
    */
   @Override
-  public Enum translateState(final JobState.State jobState) {
+  public Enum translateState(final PlanState.State jobState) {
     switch (jobState) {
       case READY:
         return RUNNING;
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
index ffd09cfca..cf0ab51fb 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -37,7 +37,7 @@ public NemoPipelineResult() {
 
   @Override
   public State getState() {
-    return (State) super.getJobState();
+    return (State) super.getPlanState();
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
index 65515cc35..9393ec6be 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
@@ -321,6 +321,9 @@ public SparkSession getOrCreate() {
       if (!options.containsKey("spark.master")) { // default spark_master option.
         return this.master("local[*]").getOrCreate();
       }
+      if (!options.containsKey("spark.driver.allowMultipleContexts")) {
+        return this.config("spark.driver.allowMultipleContexts", "true").getOrCreate();
+      }
 
       UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("ubuntu"));
 
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
index 65160121b..60d897bb9 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
@@ -26,6 +26,7 @@
 /**
  * An interface for policies, each of which is composed of a list of static optimization passes.
  * The list of static optimization passes are run in the order provided by the implementation.
+ * Most policies follow the implementation in {@link PolicyImpl}.
  */
 public interface Policy extends Serializable {
   /**
diff --git a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
index b4237bfdd..52cc00675 100644
--- a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -76,7 +76,7 @@ public void testSimplePlan() throws Exception {
     final DAG<IRVertex, IREdge> irDAG = new TestPolicy().runCompileTimeOptimization(
         irDAGBuilder.buildWithoutSourceSinkCheck(), DAG.EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> DAGOfStages = physicalPlanGenerator.stagePartitionIrDAG(irDAG);
-    final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = physicalPlanGenerator.apply(irDAG);
 
     // Test DAG of stages
     final List<Stage> sortedDAGOfStages = DAGOfStages.getTopologicalSort();
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index e49dd0c47..31ac261c8 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -67,5 +67,25 @@ public void testSparkWordCount() throws Exception {
     }
   }
 
-  // TODO #152: enable execution of multiple jobs (call scheduleJob multiple times with caching).
+  @Test(timeout = TIMEOUT)
+  public void testSparkWordAndLineCount() throws Exception {
+    final String inputFileName = "test_input_wordcount_spark";
+    final String outputFileName = "test_output_wordcount_spark";
+    final String expectedOutputFilename = "expected_output_word_and_line_count";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
+
+    JobLauncher.main(builder
+        .addJobId(JavaWordAndLineCount.class.getSimpleName() + "_test")
+        .addUserMain(JavaWordAndLineCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
index fea3a20a0..ffedf21cd 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
@@ -20,7 +20,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,9 +29,9 @@
 /**
  * Metric class for Job (or {@link PhysicalPlan}).
  */
-public final class JobMetric implements StateMetric<JobState.State> {
+public final class JobMetric implements StateMetric<PlanState.State> {
   private String id;
-  private List<StateTransitionEvent<JobState.State>> stateTransitionEvents = new ArrayList<>();
+  private List<StateTransitionEvent<PlanState.State>> stateTransitionEvents = new ArrayList<>();
   private JsonNode stageDagJson;
 
   public JobMetric(final PhysicalPlan physicalPlan) {
@@ -63,12 +63,12 @@ public String getId() {
   }
 
   @Override
-  public List<StateTransitionEvent<JobState.State>> getStateTransitionEvents() {
+  public List<StateTransitionEvent<PlanState.State>> getStateTransitionEvents() {
     return stateTransitionEvents;
   }
 
   @Override
-  public void addEvent(final JobState.State prevState, final JobState.State newState) {
+  public void addEvent(final PlanState.State prevState, final PlanState.State newState) {
     stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 7663a8cc2..0cfe8633a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -28,7 +28,7 @@
  * A Task is a self-contained executable that can be executed on a machine.
  */
 public final class Task implements Serializable {
-  private final String jobId;
+  private final String planId;
   private final String taskId;
   private final List<StageEdge> taskIncomingEdges;
   private final List<StageEdge> taskOutgoingEdges;
@@ -40,7 +40,7 @@
   /**
    * Constructor.
    *
-   * @param jobId                the id of the job.
+   * @param planId               the id of the physical plan.
    * @param taskId               the ID of the task.
    * @param attemptIdx           the attempt index.
    * @param executionProperties  {@link VertexExecutionProperty} map for the corresponding stage
@@ -49,7 +49,7 @@
    * @param taskOutgoingEdges    the outgoing edges of the task.
    * @param irVertexIdToReadable the map between IRVertex id to readable.
    */
-  public Task(final String jobId,
+  public Task(final String planId,
               final String taskId,
               final int attemptIdx,
               final ExecutionPropertyMap<VertexExecutionProperty> executionProperties,
@@ -57,7 +57,7 @@ public Task(final String jobId,
               final List<StageEdge> taskIncomingEdges,
               final List<StageEdge> taskOutgoingEdges,
               final Map<String, Readable> irVertexIdToReadable) {
-    this.jobId = jobId;
+    this.planId = planId;
     this.taskId = taskId;
     this.attemptIdx = attemptIdx;
     this.executionProperties = executionProperties;
@@ -68,10 +68,10 @@ public Task(final String jobId,
   }
 
   /**
-   * @return the id of the job.
+   * @return the id of the plan.
    */
-  public String getJobId() {
-    return jobId;
+  public String getPlanId() {
+    return planId;
   }
 
   /**
@@ -138,8 +138,8 @@ public int getAttemptIdx() {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("jobId: ");
-    sb.append(jobId);
+    sb.append("planId: ");
+    sb.append(planId);
     sb.append(" / taskId: ");
     sb.append(taskId);
     sb.append(" / attempt: ");
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
similarity index 79%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
index 6d65691f5..7edafab6d 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
@@ -20,10 +20,10 @@
 /**
  * Represents the states and their transitions of a physical plan.
  */
-public final class JobState {
+public final class PlanState {
   private final StateMachine stateMachine;
 
-  public JobState() {
+  public PlanState() {
     stateMachine = buildTaskStateMachine();
   }
 
@@ -31,16 +31,16 @@ private StateMachine buildTaskStateMachine() {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.READY, "The job has been created and submitted to runtime.");
-    stateMachineBuilder.addState(State.EXECUTING, "The job is executing (with its stages executing).");
-    stateMachineBuilder.addState(State.COMPLETE, "The job is complete.");
-    stateMachineBuilder.addState(State.FAILED, "Job failed.");
+    stateMachineBuilder.addState(State.READY, "The plan has been created and submitted to runtime.");
+    stateMachineBuilder.addState(State.EXECUTING, "The plan is executing (with its stages executing).");
+    stateMachineBuilder.addState(State.COMPLETE, "The plan is complete.");
+    stateMachineBuilder.addState(State.FAILED, "Plan failed.");
 
     // Add transitions
     stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
         "Begin executing!");
     stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
-        "All stages complete, job complete");
+        "All stages complete, plan complete");
     stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED,
         "Unrecoverable failure in a stage");
 
@@ -54,7 +54,7 @@ public StateMachine getStateMachine() {
   }
 
   /**
-   * JobState.
+   * PlanState.
    */
   public enum State {
     READY,
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 52ea468a4..6863b6f17 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -26,7 +26,7 @@
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.tang.Injector;
@@ -102,16 +102,16 @@ public void run(final String dagString) {
       physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical execution plan by compiler");
 
       // Execute!
-      final Pair<JobStateManager, ScheduledExecutorService> executionResult =
+      final Pair<PlanStateManager, ScheduledExecutorService> executionResult =
           runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
 
       // Wait for the job to finish and stop logging
-      final JobStateManager jobStateManager = executionResult.left();
+      final PlanStateManager planStateManager = executionResult.left();
       final ScheduledExecutorService dagLoggingExecutor = executionResult.right();
-      jobStateManager.waitUntilFinish();
+      planStateManager.waitUntilFinish();
       dagLoggingExecutor.shutdown();
 
-      jobStateManager.storeJSON(dagDirectory, "final");
+      planStateManager.storeJSON(dagDirectory, "final");
       LOG.info("{} is complete!", physicalPlan.getId());
     } catch (final Exception e) {
       throw new RuntimeException(e);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index f07fe1b20..546103972 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -50,6 +50,7 @@
 import org.slf4j.LoggerFactory;
 
 import static edu.snu.nemo.runtime.common.state.BlockState.State.IN_PROGRESS;
+import static edu.snu.nemo.runtime.common.state.BlockState.State.NOT_AVAILABLE;
 
 /**
  * Master-side block manager.
@@ -229,8 +230,8 @@ public void onProducerTaskScheduled(final String scheduledTaskId) {
     try {
       if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
         producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
-          if (!blockIdToMetadata.get(blockId).getBlockState()
-              .getStateMachine().getCurrentState().equals(IN_PROGRESS)) {
+          if (blockIdToMetadata.get(blockId).getBlockState()
+              .getStateMachine().getCurrentState().equals(NOT_AVAILABLE)) {
             onBlockStateChanged(blockId, IN_PROGRESS, null);
           }
         });
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
similarity index 73%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index 9ef2f8e87..dd9f302c2 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -23,7 +23,7 @@
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.StageState;
 
 import java.io.File;
@@ -48,23 +48,23 @@
 import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
 
 /**
- * Maintains three levels of state machines (JobState, StageState, and TaskState) of a physical plan.
+ * Maintains three levels of state machines (PlanState, StageState, and TaskState) of a physical plan.
  * The main API this class provides is onTaskStateReportFromExecutor(), which directly changes a TaskState.
- * JobState and StageState are updated internally in the class, and can only be read from the outside.
+ * PlanState and StageState are updated internally in the class, and can only be read from the outside.
  *
  * (CONCURRENCY) The public methods of this class are synchronized.
  */
 @DriverSide
 @ThreadSafe
-public final class JobStateManager {
-  private static final Logger LOG = LoggerFactory.getLogger(JobStateManager.class.getName());
-  private final String jobId;
+public final class PlanStateManager {
+  private static final Logger LOG = LoggerFactory.getLogger(PlanStateManager.class.getName());
+  private final String planId;
   private final int maxScheduleAttempt;
 
   /**
-   * The data structures below track the execution states of this job.
+   * The data structures below track the execution states of this plan.
    */
-  private final JobState jobState;
+  private final PlanState planState;
   private final Map<String, StageState> idToStageStates;
   private final Map<String, TaskState> idToTaskStates;
 
@@ -75,15 +75,15 @@
   private final Map<String, Integer> taskIdToCurrentAttempt;
 
   /**
-   * Represents the job to manage.
+   * Represents the plan to manage.
    */
   private final PhysicalPlan physicalPlan;
 
   /**
-   * A lock and condition to check whether the job is finished or not.
+   * A lock and condition to check whether the plan is finished or not.
    */
   private final Lock finishLock;
-  private final Condition jobFinishedCondition;
+  private final Condition planFinishedCondition;
 
   /**
    * For metrics.
@@ -92,33 +92,33 @@
 
   private MetricStore metricStore;
 
-  public JobStateManager(final PhysicalPlan physicalPlan,
-                         final MetricMessageHandler metricMessageHandler,
-                         final int maxScheduleAttempt) {
-    this.jobId = physicalPlan.getId();
+  public PlanStateManager(final PhysicalPlan physicalPlan,
+                          final MetricMessageHandler metricMessageHandler,
+                          final int maxScheduleAttempt) {
+    this.planId = physicalPlan.getId();
     this.physicalPlan = physicalPlan;
     this.metricMessageHandler = metricMessageHandler;
     this.maxScheduleAttempt = maxScheduleAttempt;
-    this.jobState = new JobState();
+    this.planState = new PlanState();
     this.idToStageStates = new HashMap<>();
     this.idToTaskStates = new HashMap<>();
     this.taskIdToCurrentAttempt = new HashMap<>();
     this.finishLock = new ReentrantLock();
-    this.jobFinishedCondition = finishLock.newCondition();
+    this.planFinishedCondition = finishLock.newCondition();
     this.metricStore = MetricStore.getStore();
 
-    metricStore.getOrCreateMetric(JobMetric.class, jobId).setStageDAG(physicalPlan.getStageDAG());
-    metricStore.triggerBroadcast(JobMetric.class, jobId);
+    metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlan.getStageDAG());
+    metricStore.triggerBroadcast(JobMetric.class, planId);
     initializeComputationStates();
   }
 
   /**
-   * Initializes the states for the job/stages/tasks for this job.
+   * Initializes the states for the plan/stages/tasks for this plan.
    */
   private void initializeComputationStates() {
-    onJobStateChanged(JobState.State.EXECUTING);
+    onPlanStateChanged(PlanState.State.EXECUTING);
 
-    // Initialize the states for the job down to task-level.
+    // Initialize the states for the plan down to task-level.
     physicalPlan.getStageDAG().topologicalDo(stage -> {
       idToStageStates.put(stage.getId(), new StageState());
       stage.getTaskIds().forEach(taskId -> {
@@ -132,9 +132,9 @@ private void initializeComputationStates() {
    * Updates the state of a task.
    * Task state changes can occur both in master and executor.
    * State changes that occur in master are
-   * initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}.
+   * initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchScheduler}.
    * State changes that occur in executors are sent to master as a control message,
-   * and the call to this method is initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}
+   * and the call to this method is initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchScheduler}
    * when the message/event is received.
    *
    * @param taskId  the ID of the task.
@@ -226,97 +226,98 @@ private void onStageStateChanged(final String stageId, final StageState.State ne
         new Object[]{stageId, stageStateMachine.getCurrentState(), newStageState});
     stageStateMachine.setState(newStageState);
 
-    // Change job state if needed
+    // Change plan state if needed
     final boolean allStagesCompleted = idToStageStates.values().stream().allMatch(state ->
         state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
     if (allStagesCompleted) {
-      onJobStateChanged(JobState.State.COMPLETE);
+      onPlanStateChanged(PlanState.State.COMPLETE);
     }
   }
 
   /**
    * (PRIVATE METHOD)
-   * Updates the state of the job.
-   * @param newState of the job.
+   * Updates the state of the plan.
+   * @param newState of the plan.
    */
-  private void onJobStateChanged(final JobState.State newState) {
-    metricStore.getOrCreateMetric(JobMetric.class, jobId)
-        .addEvent((JobState.State) jobState.getStateMachine().getCurrentState(), newState);
-    metricStore.triggerBroadcast(JobMetric.class, jobId);
+  private void onPlanStateChanged(final PlanState.State newState) {
+    metricStore.getOrCreateMetric(JobMetric.class, planId)
+        .addEvent((PlanState.State) planState.getStateMachine().getCurrentState(), newState);
+    metricStore.triggerBroadcast(JobMetric.class, planId);
 
-    jobState.getStateMachine().setState(newState);
+    planState.getStateMachine().setState(newState);
 
-    if (newState == JobState.State.EXECUTING) {
-      LOG.debug("Executing Job ID {}...", this.jobId);
-    } else if (newState == JobState.State.COMPLETE || newState == JobState.State.FAILED) {
-      LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
+    if (newState == PlanState.State.EXECUTING) {
+      LOG.debug("Executing Plan ID {}...", this.planId);
+    } else if (newState == PlanState.State.COMPLETE || newState == PlanState.State.FAILED) {
+      LOG.debug("Plan ID {} {}!", new Object[]{planId, newState});
 
-      // Awake all threads waiting the finish of this job.
+      // Awake all threads waiting the finish of this plan.
       finishLock.lock();
 
       try {
-        jobFinishedCondition.signalAll();
+        planFinishedCondition.signalAll();
       } finally {
         finishLock.unlock();
       }
     } else {
-      throw new IllegalStateTransitionException(new Exception("Illegal Job State Transition"));
+      throw new IllegalStateTransitionException(new Exception("Illegal Plan State Transition"));
     }
   }
 
 
   /**
-   * Wait for this job to be finished and return the final state.
-   * @return the final state of this job.
+   * Wait for this plan to be finished and return the final state.
+   * @return the final state of this plan.
    */
-  public JobState.State waitUntilFinish() {
+  public PlanState.State waitUntilFinish() {
     finishLock.lock();
     try {
-      if (!isJobDone()) {
-        jobFinishedCondition.await();
+      if (!isPlanDone()) {
+        planFinishedCondition.await();
       }
     } catch (final InterruptedException e) {
-      LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+      LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
       Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();
     }
-    return getJobState();
+    return getPlanState();
   }
 
   /**
-   * Wait for this job to be finished and return the final state.
+   * Wait for this plan to be finished and return the final state.
    * It wait for at most the given time.
    * @param timeout of waiting.
    * @param unit of the timeout.
-   * @return the final state of this job.
+   * @return the final state of this plan.
    */
-  public JobState.State waitUntilFinish(final long timeout, final TimeUnit unit) {
+  public PlanState.State waitUntilFinish(final long timeout, final TimeUnit unit) {
     finishLock.lock();
     try {
-      if (!isJobDone()) {
-        if (!jobFinishedCondition.await(timeout, unit)) {
-          LOG.warn("Timeout during waiting the finish of Job ID {}", jobId);
+      if (!isPlanDone()) {
+        if (!planFinishedCondition.await(timeout, unit)) {
+          LOG.warn("Timeout during waiting the finish of Plan ID {}", planId);
         }
       }
     } catch (final InterruptedException e) {
-      LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+      LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
       Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();
     }
-    return getJobState();
+    return getPlanState();
   }
 
-  public synchronized boolean isJobDone() {
-    return (getJobState() == JobState.State.COMPLETE || getJobState() == JobState.State.FAILED);
+  public synchronized boolean isPlanDone() {
+    return (getPlanState() == PlanState.State.COMPLETE || getPlanState() == PlanState.State.FAILED);
   }
-  public synchronized String getJobId() {
-    return jobId;
+
+  public synchronized String getPlanId() {
+    return planId;
   }
 
-  public synchronized JobState.State getJobState() {
-    return (JobState.State) jobState.getStateMachine().getCurrentState();
+  public synchronized PlanState.State getPlanState() {
+    return (PlanState.State) planState.getStateMachine().getCurrentState();
   }
 
   public synchronized StageState.State getStageState(final String stageId) {
@@ -341,7 +342,7 @@ public synchronized int getTaskAttempt(final String taskId) {
   }
 
   /**
-   * Stores JSON representation of job state into a file.
+   * Stores JSON representation of plan state into a file.
    * @param directory the directory which JSON representation is saved to
    * @param suffix suffix for file name
    */
@@ -350,29 +351,29 @@ public void storeJSON(final String directory, final String suffix) {
       return;
     }
 
-    final File file = new File(directory, jobId + "-" + suffix + ".json");
+    final File file = new File(directory, planId + "-" + suffix + ".json");
     file.getParentFile().mkdirs();
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
-      LOG.debug(String.format("JSON representation of job state for %s(%s) was saved to %s",
-          jobId, suffix, file.getPath()));
+      LOG.debug(String.format("JSON representation of plan state for %s(%s) was saved to %s",
+          planId, suffix, file.getPath()));
     } catch (final IOException e) {
-      LOG.warn(String.format("Cannot store JSON representation of job state for %s(%s) to %s: %s",
-          jobId, suffix, file.getPath(), e.toString()));
+      LOG.warn(String.format("Cannot store JSON representation of plan state for %s(%s) to %s: %s",
+          planId, suffix, file.getPath(), e.toString()));
     }
   }
 
   public String toStringWithPhysicalPlan() {
     final StringBuilder sb = new StringBuilder("{");
     sb.append("\"dag\": ").append(physicalPlan.getStageDAG().toString()).append(", ");
-    sb.append("\"jobState\": ").append(toString()).append("}");
+    sb.append("\"planState\": ").append(toString()).append("}");
     return sb.toString();
   }
 
   @Override
   public synchronized String toString() {
     final StringBuilder sb = new StringBuilder("{");
-    sb.append("\"jobId\": \"").append(jobId).append("\", ");
+    sb.append("\"planId\": \"").append(planId).append("\", ");
     sb.append("\"stages\": [");
     boolean isFirstStage = true;
     for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 0aef6a642..41ad24310 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -62,7 +62,7 @@
  * Runtime Master is the central controller of Runtime.
  * Compiler submits an {@link PhysicalPlan} to Runtime Master to execute a job.
  * Runtime Master handles:
- *    a) Scheduling the job with {@link Scheduler}.
+ *    a) Scheduling the plan with {@link Scheduler}.
  *    b) Managing resources with {@link ContainerManager}.
  *    c) Managing blocks with {@link BlockManagerMaster}.
  *    d) Receiving and sending control messages with {@link MessageEnvironment}.
@@ -156,22 +156,22 @@ private Server startRestMetricServer() {
    * @param plan to execute
    * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
    */
-  public Pair<JobStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
-                                                                 final int maxScheduleAttempt) {
-    final Callable<Pair<JobStateManager, ScheduledExecutorService>> jobExecutionCallable = () -> {
+  public Pair<PlanStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
+                                                                  final int maxScheduleAttempt) {
+    final Callable<Pair<PlanStateManager, ScheduledExecutorService>> planExecutionCallable = () -> {
       this.irVertices.addAll(plan.getIdToIRVertex().values());
       try {
         blockManagerMaster.initialize(plan);
-        final JobStateManager jobStateManager = new JobStateManager(plan, metricMessageHandler, maxScheduleAttempt);
-        scheduler.scheduleJob(plan, jobStateManager);
-        final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging(jobStateManager);
-        return Pair.of(jobStateManager, dagLoggingExecutor);
+        final PlanStateManager planStateManager = new PlanStateManager(plan, metricMessageHandler, maxScheduleAttempt);
+        scheduler.schedulePlan(plan, planStateManager);
+        final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging(planStateManager);
+        return Pair.of(planStateManager, dagLoggingExecutor);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
     };
     try {
-      return runtimeMasterThread.submit(jobExecutionCallable).get();
+      return runtimeMasterThread.submit(planExecutionCallable).get();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -444,17 +444,17 @@ private void accumulateBarrierMetric(final List<ControlMessage.PartitionSizeEntr
 
   /**
    * Schedules a periodic DAG logging thread.
-   * @param jobStateManager for the job the DAG should be logged.
+   * @param planStateManager for the plan the DAG should be logged.
    * TODO #20: RESTful APIs to Access Job State and Metric.
    * @return the scheduled executor service.
    */
-  private ScheduledExecutorService scheduleDagLogging(final JobStateManager jobStateManager) {
+  private ScheduledExecutorService scheduleDagLogging(final PlanStateManager planStateManager) {
     final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor();
     dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
       private int dagLogFileIndex = 0;
 
       public void run() {
-        jobStateManager.storeJSON(dagDirectory, String.valueOf(dagLogFileIndex++));
+        planStateManager.storeJSON(dagDirectory, String.valueOf(dagLogFileIndex++));
       }
     }, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS);
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 4047cbb30..9872f954f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -48,6 +48,6 @@ public void setScheduler(final Scheduler scheduler) {
   public void onNext(final UpdatePhysicalPlanEvent updatePhysicalPlanEvent) {
     final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
 
-    this.scheduler.updateJob(newPlan.getId(), newPlan);
+    this.scheduler.updatePlan(newPlan.getId(), newPlan);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
similarity index 88%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 57db43bba..ffee47920 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -31,7 +31,7 @@
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.LoggerFactory;
@@ -49,15 +49,15 @@
  * (CONCURRENCY) Only a single dedicated thread should use the public methods of this class.
  * (i.e., runtimeMasterThread in RuntimeMaster)
  *
- * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
+ * BatchScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
  */
 @DriverSide
 @NotThreadSafe
-public final class BatchSingleJobScheduler implements Scheduler {
-  private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobScheduler.class.getName());
+public final class BatchScheduler implements Scheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(BatchScheduler.class.getName());
 
   /**
-   * Components related to scheduling the given job.
+   * Components related to scheduling the given plan.
    */
   private final SchedulerRunner schedulerRunner;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
@@ -70,19 +70,19 @@
   private final PubSubEventHandlerWrapper pubSubEventHandlerWrapper;
 
   /**
-   * The below variables depend on the submitted job to execute.
+   * The below variables depend on the submitted plan to execute.
    */
   private PhysicalPlan physicalPlan;
-  private JobStateManager jobStateManager;
+  private PlanStateManager planStateManager;
   private List<List<Stage>> sortedScheduleGroups;
 
   @Inject
-  private BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
-                                  final PendingTaskCollectionPointer pendingTaskCollectionPointer,
-                                  final BlockManagerMaster blockManagerMaster,
-                                  final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
-                                  final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
-                                  final ExecutorRegistry executorRegistry) {
+  private BatchScheduler(final SchedulerRunner schedulerRunner,
+                         final PendingTaskCollectionPointer pendingTaskCollectionPointer,
+                         final BlockManagerMaster blockManagerMaster,
+                         final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
+                         final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
+                         final ExecutorRegistry executorRegistry) {
     this.schedulerRunner = schedulerRunner;
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.blockManagerMaster = blockManagerMaster;
@@ -96,26 +96,22 @@ private BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
   }
 
   /**
-   * @param physicalPlanOfJob of the job.
-   * @param jobStateManagerOfJob of the job.
+   * @param submittedPhysicalPlan the physical plan to schedule.
+   * @param submittedPlanStateManager the state manager of the plan.
    */
   @Override
-  public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final JobStateManager jobStateManagerOfJob) {
-    if (this.physicalPlan != null || this.jobStateManager != null) {
-      throw new IllegalStateException("scheduleJob() has been called more than once");
-    }
+  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final PlanStateManager submittedPlanStateManager) {
+    LOG.info("Scheduled plan");
 
-    this.physicalPlan = physicalPlanOfJob;
-    this.jobStateManager = jobStateManagerOfJob;
+    this.physicalPlan = submittedPhysicalPlan;
+    this.planStateManager = submittedPlanStateManager;
 
-    schedulerRunner.run(jobStateManager);
-    LOG.info("Job to schedule: {}", this.physicalPlan.getId());
+    schedulerRunner.run(this.planStateManager);
+    LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
 
-    this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices()
-        .stream()
+    this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices().stream()
         .collect(Collectors.groupingBy(Stage::getScheduleGroup))
-        .entrySet()
-        .stream()
+        .entrySet().stream()
         .sorted(Map.Entry.comparingByKey())
         .map(Map.Entry::getValue)
         .collect(Collectors.toList());
@@ -124,8 +120,8 @@ public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final JobStateMana
   }
 
   @Override
-  public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan) {
-    // update the job in the scheduler.
+  public void updatePlan(final String planId, final PhysicalPlan newPhysicalPlan) {
+    // update the physical plan in the scheduler.
     // NOTE: what's already been executed is not modified in the new physical plan.
     this.physicalPlan = newPhysicalPlan;
   }
@@ -148,11 +144,11 @@ public void onTaskStateReportFromExecutor(final String executorId,
                                             final TaskState.State newState,
                                             @Nullable final String vertexPutOnHold,
                                             final TaskState.RecoverableTaskFailureCause failureCause) {
-    final int currentTaskAttemptIndex = jobStateManager.getTaskAttempt(taskId);
+    final int currentTaskAttemptIndex = planStateManager.getTaskAttempt(taskId);
 
     if (taskAttemptIndex == currentTaskAttemptIndex) {
       // Do change state, as this notification is for the current task attempt.
-      jobStateManager.onTaskStateChanged(taskId, newState);
+      planStateManager.onTaskStateChanged(taskId, newState);
       switch (newState) {
         case COMPLETE:
           onTaskExecutionComplete(executorId, taskId);
@@ -165,7 +161,7 @@ public void onTaskStateReportFromExecutor(final String executorId,
           onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
           break;
         case FAILED:
-          throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on Task #")
+          throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
               .append(taskId).append(" in Executor ").append(executorId).toString()));
         case READY:
         case EXECUTING:
@@ -181,8 +177,8 @@ public void onTaskStateReportFromExecutor(final String executorId,
         case ON_HOLD:
           // If the stage has completed
           final String stageIdForTaskUponCompletion = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
-          if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
-            if (!jobStateManager.isJobDone()) {
+          if (planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
+            if (!planStateManager.isPlanDone()) {
               doSchedule();
             }
           }
@@ -296,7 +292,7 @@ private void doSchedule() {
     return sortedScheduleGroups.stream()
         .filter(scheduleGroup -> scheduleGroup.stream()
             .map(Stage::getId)
-            .map(jobStateManager::getStageState)
+            .map(planStateManager::getStageState)
             .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
         .findFirst(); // selects the one with the smallest scheduling group index.
   }
@@ -309,7 +305,7 @@ private void doSchedule() {
 
     final List<String> taskIdsToSchedule = new LinkedList<>();
     for (final String taskId : stageToSchedule.getTaskIds()) {
-      final TaskState.State taskState = jobStateManager.getTaskState(taskId);
+      final TaskState.State taskState = planStateManager.getTaskState(taskId);
 
       switch (taskState) {
         // Don't schedule these.
@@ -320,7 +316,7 @@ private void doSchedule() {
 
         // These are schedulable.
         case SHOULD_RETRY:
-          jobStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
+          planStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
         case READY:
           taskIdsToSchedule.add(taskId);
           break;
@@ -337,7 +333,7 @@ private void doSchedule() {
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId); // Notify the block manager early for push edges.
       final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
-      final int attemptIdx = jobStateManager.getTaskAttempt(taskId);
+      final int attemptIdx = planStateManager.getTaskAttempt(taskId);
       tasks.add(new Task(
           physicalPlan.getId(),
           taskId,
@@ -385,7 +381,7 @@ private void onTaskExecutionOnHold(final String executorId,
     final String stageIdForTaskUponCompletion = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
-        jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+        planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
     if (stageComplete) {
       // get optimization vertex from the task.
@@ -441,7 +437,7 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
     final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
     LOG.info("Will be retried: {}", tasksToRetry);
     tasksToRetry.forEach(
-        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+        taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
   }
 
   private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index aaa82e061..f8413f1a0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -31,23 +31,23 @@
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
  */
 @DriverSide
-@DefaultImplementation(BatchSingleJobScheduler.class)
+@DefaultImplementation(BatchScheduler.class)
 public interface Scheduler {
 
   /**
-   * Schedules the given job.
+   * Schedules the given plan.
    * @param physicalPlan of the job being submitted.
-   * @param jobStateManager to manage the states of the submitted job.
+   * @param planStateManager to manage the states of the submitted plan.
    */
-  void scheduleJob(PhysicalPlan physicalPlan,
-                   JobStateManager jobStateManager);
+  void schedulePlan(PhysicalPlan physicalPlan,
+                    PlanStateManager planStateManager);
 
   /**
    * Receives and updates the scheduler with a new physical plan for a job.
-   * @param jobId the ID of the job to change the physical plan.
+   * @param planId the ID of the physical plan to change.
    * @param newPhysicalPlan new physical plan for the job.
    */
-  void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
+  void updatePlan(String planId, PhysicalPlan newPhysicalPlan);
 
   /**
    * Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index defc91b6e..750c0fcc9 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -46,7 +46,7 @@
 @NotThreadSafe
 public final class SchedulerRunner {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
-  private final Map<String, JobStateManager> jobStateManagers;
+  private final Map<String, PlanStateManager> planStateManagers;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorService schedulerThread;
   private boolean isSchedulerRunning;
@@ -62,7 +62,7 @@ private SchedulerRunner(final SchedulingConstraintRegistry schedulingConstraintR
                           final SchedulingPolicy schedulingPolicy,
                           final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                           final ExecutorRegistry executorRegistry) {
-    this.jobStateManagers = new HashMap<>();
+    this.planStateManagers = new HashMap<>();
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
         new Thread(runnable, "SchedulerRunner thread"));
@@ -84,11 +84,11 @@ public void run() {
         doScheduleTaskList();
         schedulingIteration.await();
       }
-      jobStateManagers.values().forEach(jobStateManager -> {
-        if (jobStateManager.isJobDone()) {
-          LOG.info("{} is complete.", jobStateManager.getJobId());
+      planStateManagers.values().forEach(planStateManager -> {
+        if (planStateManager.isPlanDone()) {
+          LOG.info("{} is complete.", planStateManager.getPlanId());
         } else {
-          LOG.info("{} is incomplete.", jobStateManager.getJobId());
+          LOG.info("{} is incomplete.", planStateManager.getPlanId());
         }
       });
       LOG.info("SchedulerRunner Terminated!");
@@ -106,8 +106,8 @@ void doScheduleTaskList() {
     final Collection<Task> taskList = taskListOptional.get();
     final List<Task> couldNotSchedule = new ArrayList<>();
     for (final Task task : taskList) {
-      final JobStateManager jobStateManager = jobStateManagers.get(task.getJobId());
-      if (!jobStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) {
+      final PlanStateManager planStateManager = planStateManagers.get(task.getPlanId());
+      if (!planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) {
         // Guard against race conditions causing duplicate task launches
         LOG.debug("Skipping {} as it is not READY", task.getTaskId());
         continue;
@@ -128,7 +128,7 @@ void doScheduleTaskList() {
           final ExecutorRepresenter selectedExecutor
               = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task);
           // update metadata first
-          jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
+          planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
 
           LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
 
@@ -164,9 +164,9 @@ void onNewPendingTaskCollectionAvailable() {
   /**
    * Run the scheduler thread.
    */
-  void run(final JobStateManager jobStateManager) {
+  void run(final PlanStateManager planStateManager) {
+    planStateManagers.put(planStateManager.getPlanId(), planStateManager);
     if (!isTerminated && !isSchedulerRunning) {
-      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
       schedulerThread.execute(new SchedulerThread());
       schedulerThread.shutdown();
       isSchedulerRunning = true;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
similarity index 67%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
index 73176be2f..20031538f 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
@@ -22,7 +22,7 @@
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
@@ -42,11 +42,11 @@
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests {@link JobStateManager}.
+ * Tests {@link PlanStateManager}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(MetricMessageHandler.class)
-public final class JobStateManagerTest {
+public final class PlanStateManagerTest {
   private static final int MAX_SCHEDULE_ATTEMPT = 2;
   private MetricMessageHandler metricMessageHandler;
 
@@ -59,17 +59,17 @@ public void setUp() throws Exception {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link JobStateManager}.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link PlanStateManager}.
    * State changes are explicitly called to check whether states are managed correctly or not.
    */
   @Test
   public void testPhysicalPlanStateChanges() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    final PlanStateManager planStateManager =
+        new PlanStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
-    assertEquals(jobStateManager.getJobId(), "TestPlan");
+    assertEquals(planStateManager.getPlanId(), "TestPlan");
 
     final List<Stage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
 
@@ -77,45 +77,45 @@ public void testPhysicalPlanStateChanges() throws Exception {
       final Stage stage = stageList.get(stageIdx);
       final List<String> taskIds = stage.getTaskIds();
       taskIds.forEach(taskId -> {
-        jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
-        jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
         if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 1) {
-          assertEquals(StageState.State.COMPLETE, jobStateManager.getStageState(stage.getId()));
+          assertEquals(StageState.State.COMPLETE, planStateManager.getStageState(stage.getId()));
         }
       });
-      taskIds.forEach(taskId -> assertEquals(jobStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
+      taskIds.forEach(taskId -> assertEquals(planStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
 
       if (stageIdx == stageList.size() - 1) {
-        assertEquals(jobStateManager.getJobState(), JobState.State.COMPLETE);
+        assertEquals(planStateManager.getPlanState(), PlanState.State.COMPLETE);
       }
     }
   }
 
   /**
-   * Test whether the methods waiting finish of job works properly.
+   * Test whether the methods waiting for the finish of the plan works properly.
    */
   @Test(timeout = 2000)
   public void testWaitUntilFinish() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    final PlanStateManager planStateManager =
+        new PlanStateManager(physicalPlan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
-    assertFalse(jobStateManager.isJobDone());
+    assertFalse(planStateManager.isPlanDone());
 
-    // Wait for the job to finish and check the job state.
+    // Wait for the plan to finish and check the plan state.
     // It have to return EXECUTING state after timeout.
-    final JobState.State executingState = jobStateManager.waitUntilFinish(100, TimeUnit.MILLISECONDS);
-    assertEquals(JobState.State.EXECUTING, executingState);
+    final PlanState.State executingState = planStateManager.waitUntilFinish(100, TimeUnit.MILLISECONDS);
+    assertEquals(PlanState.State.EXECUTING, executingState);
 
-    // Complete the job and check the result again.
-    // It have to return COMPLETE.
+    // Complete the plan and check the result again.
+    // It has to return COMPLETE.
     final List<String> tasks = physicalPlan.getStageDAG().getTopologicalSort().stream()
         .flatMap(stage -> stage.getTaskIds().stream())
         .collect(Collectors.toList());
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING));
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE));
-    final JobState.State completedState = jobStateManager.waitUntilFinish();
-    assertEquals(JobState.State.COMPLETE, completedState);
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING));
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE));
+    final PlanState.State completedState = planStateManager.waitUntilFinish();
+    assertEquals(PlanState.State.COMPLETE, completedState);
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
similarity index 88%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index cfa6194f5..9733c7508 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -23,7 +23,7 @@
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -54,13 +54,13 @@
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests {@link BatchSingleJobScheduler}.
+ * Tests {@link BatchScheduler}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ContainerManager.class, BlockManagerMaster.class,
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
-public final class BatchSingleJobSchedulerTest {
-  private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
+public final class BatchSchedulerTest {
+  private static final Logger LOG = LoggerFactory.getLogger(BatchSchedulerTest.class.getName());
   private Scheduler scheduler;
   private ExecutorRegistry executorRegistry;
   private final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
@@ -80,7 +80,7 @@ public void setUp() throws Exception {
     injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
     injector.bindVolatileInstance(PubSubEventHandlerWrapper.class, mock(PubSubEventHandlerWrapper.class));
     injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
-    scheduler = injector.getInstance(BatchSingleJobScheduler.class);
+    scheduler = injector.getInstance(BatchScheduler.class);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
@@ -114,28 +114,28 @@ public void setUp() throws Exception {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link BatchSingleJobScheduler}.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link BatchScheduler}.
    * Task state changes are explicitly submitted to scheduler instead of executor messages.
    */
   @Test(timeout=10000)
   public void testPull() throws Exception {
-    scheduleAndCheckJobTermination(
+    scheduleAndCheckPlanTermination(
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false));
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link BatchSingleJobScheduler}.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link BatchScheduler}.
    * Task state changes are explicitly submitted to scheduler instead of executor messages.
    */
   @Test(timeout=10000)
   public void testPush() throws Exception {
-    scheduleAndCheckJobTermination(
+    scheduleAndCheckPlanTermination(
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, true));
   }
 
-  private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws InjectionException {
-    final JobStateManager jobStateManager = new JobStateManager(plan, metricMessageHandler, 1);
-    scheduler.scheduleJob(plan, jobStateManager);
+  private void scheduleAndCheckPlanTermination(final PhysicalPlan plan) throws InjectionException {
+    final PlanStateManager planStateManager = new PlanStateManager(plan, metricMessageHandler, 1);
+    scheduler.schedulePlan(plan, planStateManager);
 
     // For each ScheduleGroup, test if the tasks of the next ScheduleGroup are scheduled
     // after the stages of each ScheduleGroup are made "complete".
@@ -146,14 +146,14 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
       stages.forEach(stage -> {
         SchedulerTestUtil.completeStage(
-            jobStateManager, scheduler, executorRegistry, stage, SCHEDULE_ATTEMPT_INDEX);
+            planStateManager, scheduler, executorRegistry, stage, SCHEDULE_ATTEMPT_INDEX);
       });
     }
 
-    LOG.debug("Waiting for job termination after sending stage completion events");
-    while (!jobStateManager.isJobDone()) {
+    LOG.debug("Waiting for plan termination after sending stage completion events");
+    while (!planStateManager.isPlanDone()) {
     }
-    assertTrue(jobStateManager.isJobDone());
+    assertTrue(planStateManager.isPlanDone());
   }
 
   private List<Stage> filterStagesWithAScheduleGroup(
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 815dff8ff..d31965bd2 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import java.util.Optional;
@@ -29,25 +29,25 @@
 final class SchedulerTestUtil {
   /**
    * Complete the stage by completing all of its Tasks.
-   * @param jobStateManager for the submitted job.
-   * @param scheduler for the submitted job.
+   * @param planStateManager for the submitted plan.
+   * @param scheduler for the submitted plan.
    * @param executorRegistry provides executor representers
    * @param stage for which the states should be marked as complete.
    */
-  static void completeStage(final JobStateManager jobStateManager,
+  static void completeStage(final PlanStateManager planStateManager,
                             final Scheduler scheduler,
                             final ExecutorRegistry executorRegistry,
                             final Stage stage,
                             final int attemptIdx) {
     // Loop until the stage completes.
     while (true) {
-      final StageState.State stageState = jobStateManager.getStageState(stage.getId());
+      final StageState.State stageState = planStateManager.getStageState(stage.getId());
       if (StageState.State.COMPLETE == stageState) {
         // Stage has completed, so we break out of the loop.
         break;
       } else if (StageState.State.INCOMPLETE == stageState) {
         stage.getTaskIds().forEach(taskId -> {
-          final TaskState.State taskState = jobStateManager.getTaskState(taskId);
+          final TaskState.State taskState = planStateManager.getTaskState(taskId);
           if (TaskState.State.EXECUTING == taskState) {
             sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
                 TaskState.State.COMPLETE, attemptIdx, null);
@@ -66,7 +66,7 @@ static void completeStage(final JobStateManager jobStateManager,
   /**
    * Sends task state change event to scheduler.
    * This replaces executor's task completion messages for testing purposes.
-   * @param scheduler for the submitted job.
+   * @param scheduler for the submitted plan.
    * @param executorRegistry provides executor representers
    * @param taskId for the task to change the state.
    * @param newState for the task.
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index cd5443276..e1ab44936 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -20,10 +20,10 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -68,7 +68,7 @@
   private Random random;
   private Scheduler scheduler;
   private ExecutorRegistry executorRegistry;
-  private JobStateManager jobStateManager;
+  private PlanStateManager planStateManager;
 
   private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
 
@@ -91,14 +91,14 @@ public void setUp() throws Exception {
     injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
     scheduler = injector.getInstance(Scheduler.class);
 
-    // Get JobStateManager
-    jobStateManager = runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
+    // Get PlanStateManager
+    planStateManager = runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
   }
 
   @Test(timeout=7000)
   public void testExecutorRemoved() throws Exception {
-    // Until the job finishes, events happen
-    while (!jobStateManager.isJobDone()) {
+    // Until the plan finishes, events happen
+    while (!planStateManager.isPlanDone()) {
       // 50% chance remove, 50% chance add, 80% chance task completed
       executorRemoved(0.5);
       executorAdded(0.5);
@@ -108,9 +108,9 @@ public void testExecutorRemoved() throws Exception {
       Thread.sleep(10);
     }
 
-    // Job should COMPLETE
-    assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
-    assertTrue(jobStateManager.isJobDone());
+    // Plan should COMPLETE
+    assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
+    assertTrue(planStateManager.isPlanDone());
   }
 
   @Test(timeout=7000)
@@ -120,8 +120,8 @@ public void testTaskOutputWriteFailure() throws Exception {
     executorAdded(1.0);
     executorAdded(1.0);
 
-    // Until the job finishes, events happen
-    while (!jobStateManager.isJobDone()) {
+    // Until the plan finishes, events happen
+    while (!planStateManager.isPlanDone()) {
       // 50% chance task completed
       // 50% chance task output write failed
       taskCompleted(0.5);
@@ -131,9 +131,9 @@ public void testTaskOutputWriteFailure() throws Exception {
       Thread.sleep(10);
     }
 
-    // Job should COMPLETE
-    assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
-    assertTrue(jobStateManager.isJobDone());
+    // Plan should COMPLETE
+    assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
+    assertTrue(planStateManager.isPlanDone());
   }
 
   ////////////////////////////////////////////////////////////////// Events
@@ -177,12 +177,12 @@ private void taskCompleted(final double chance) {
       return;
     }
 
-    final List<String> executingTasks = getTasksInState(jobStateManager, TaskState.State.EXECUTING);
+    final List<String> executingTasks = getTasksInState(planStateManager, TaskState.State.EXECUTING);
     if (!executingTasks.isEmpty()) {
       final int randomIndex = random.nextInt(executingTasks.size());
       final String selectedTask = executingTasks.get(randomIndex);
       SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
-          TaskState.State.COMPLETE, jobStateManager.getTaskAttempt(selectedTask));
+          TaskState.State.COMPLETE, planStateManager.getTaskAttempt(selectedTask));
     }
   }
 
@@ -191,30 +191,30 @@ private void taskOutputWriteFailed(final double chance) {
       return;
     }
 
-    final List<String> executingTasks = getTasksInState(jobStateManager, TaskState.State.EXECUTING);
+    final List<String> executingTasks = getTasksInState(planStateManager, TaskState.State.EXECUTING);
     if (!executingTasks.isEmpty()) {
       final int randomIndex = random.nextInt(executingTasks.size());
       final String selectedTask = executingTasks.get(randomIndex);
       SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
-          TaskState.State.SHOULD_RETRY, jobStateManager.getTaskAttempt(selectedTask),
+          TaskState.State.SHOULD_RETRY, planStateManager.getTaskAttempt(selectedTask),
           TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE);
     }
   }
 
   ////////////////////////////////////////////////////////////////// Helper methods
 
-  private List<String> getTasksInState(final JobStateManager jobStateManager, final TaskState.State state) {
-    return jobStateManager.getAllTaskStates().entrySet().stream()
+  private List<String> getTasksInState(final PlanStateManager planStateManager, final TaskState.State state) {
+    return planStateManager.getAllTaskStates().entrySet().stream()
         .filter(entry -> entry.getValue().getStateMachine().getCurrentState().equals(state))
         .map(Map.Entry::getKey)
         .collect(Collectors.toList());
   }
 
-  private JobStateManager runPhysicalPlan(final TestPlanGenerator.PlanType planType) throws Exception {
+  private PlanStateManager runPhysicalPlan(final TestPlanGenerator.PlanType planType) throws Exception {
     final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
     final PhysicalPlan plan = TestPlanGenerator.generatePhysicalPlan(planType, false);
-    final JobStateManager jobStateManager = new JobStateManager(plan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-    return jobStateManager;
+    final PlanStateManager planStateManager = new PlanStateManager(plan, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    scheduler.schedulePlan(plan, planStateManager);
+    return planStateManager;
   }
 }
diff --git a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 5742238ac..3b3646eb1 100644
--- a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -94,7 +94,7 @@ public static PhysicalPlan generatePhysicalPlan(final PlanType planType, final b
   private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDAG,
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
-    final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
+    final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
     return new PhysicalPlan("TestPlan", physicalDAG);
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services