You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/03/07 06:27:54 UTC

[GitHub] johnyangk commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

johnyangk commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172750797
 
 

 ##########
 File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
 ##########
 @@ -15,123 +15,100 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  private static ExecutorService completionEventThreadPool;
-  private static BlockingDeque<Runnable> eventRunnableQueue;
-  private static boolean testComplete;
-
-  public static void initialize() {
-    testComplete = false;
-    completionEventThreadPool = Executors.newFixedThreadPool(5);
-
-    eventRunnableQueue = new LinkedBlockingDeque<>();
-
-    for (int i = 0; i < 5; i++) {
-      completionEventThreadPool.execute(() -> {
-        while (!testComplete || !eventRunnableQueue.isEmpty()) {
-          try {
-            final Runnable event = eventRunnableQueue.takeFirst();
-            event.run();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-      });
-    }
-    completionEventThreadPool.shutdown();
-  }
-
-  public static void cleanup() {
-    testComplete = true;
-  }
-
   /**
    * Sends a stage's completion event to scheduler, with all its task groups marked as complete as well.
    * This replaces executor's task group completion messages for testing purposes.
    * @param jobStateManager for the submitted job.
    * @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
    * @param physicalStage for which the states should be marked as complete.
    */
   public static void sendStageCompletionEventToScheduler(final JobStateManager jobStateManager,
                                                          final Scheduler scheduler,
-                                                         final ContainerManager containerManager,
+                                                         final ExecutorRegistry executorRegistry,
                                                          final PhysicalStage physicalStage,
                                                          final int attemptIdx) {
-    eventRunnableQueue.add(new Runnable() {
-      @Override
-      public void run() {
-        while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
-            == StageState.State.EXECUTING) {
-          physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-            if (jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
-                == TaskGroupState.State.EXECUTING) {
-              sendTaskGroupStateEventToScheduler(scheduler, containerManager, taskGroupId,
-                  TaskGroupState.State.COMPLETE, attemptIdx, null);
-            }
-          });
-        }
+    // Loop until the stage completes.
+    while (true) {
+      try {
+        Thread.sleep(100);
 
 Review comment:
   @seojangho Please remove L52-57. I wrote them to test something and forgot to remove them. Sorry about this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services