You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/29 07:04:57 UTC

[GitHub] seojangho closed pull request #60: [NEMO-61] Fix lost execution metric collection problem

seojangho closed pull request #60: [NEMO-61] Fix lost execution metric collection problem
URL: https://github.com/apache/incubator-nemo/pull/60
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 3b7e44c1..4d07752e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -198,6 +198,8 @@ private MessageType getMsgType(final ControlMessage.Message controlMessage) {
       case DataSizeMetric:
       case ExecutorDataCollected:
       case MetricMessageReceived:
+      case RequestMetricFlush:
+      case MetricFlushed:
         return MessageType.Send;
       case RequestBlockLocation:
         return MessageType.Request;
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index d662f661..7a3cd7f7 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -55,8 +55,10 @@ enum MessageType {
     RequestBlockLocation = 4;
     BlockLocationInfo = 5;
     ExecutorFailed = 6;
-    MetricMessageReceived = 7;
-    ExecutorDataCollected = 8;
+    ExecutorDataCollected = 7;
+    MetricMessageReceived = 8;
+    RequestMetricFlush = 9;
+    MetricFlushed = 10;
 }
 
 message Message {
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
index a05b7d2a..a39916b4 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
@@ -19,8 +19,6 @@
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
-import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index f5a2e835..8852e0e4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -161,15 +161,18 @@ public void terminate() {
     @Override
     public void onMessage(final ControlMessage.Message message) {
       switch (message.getType()) {
-      case ScheduleTask:
-        final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
-        final Task task =
-            SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
-        onTaskReceived(task);
-        break;
-      default:
-        throw new IllegalMessageException(
-            new Exception("This message should not be received by an executor :" + message.getType()));
+        case ScheduleTask:
+          final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
+          final Task task =
+              SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
+          onTaskReceived(task);
+          break;
+        case RequestMetricFlush:
+          metricMessageSender.flush();
+          break;
+        default:
+          throw new IllegalMessageException(
+              new Exception("This message should not be received by an executor :" + message.getType()));
       }
     }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 77830360..f6ffd084 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -53,6 +53,17 @@ private MetricManagerWorker(@Parameter(MetricFlushPeriod.class) final long flush
                                                       flushingPeriod, TimeUnit.MILLISECONDS);
   }
 
+  @Override
+  public void flush() {
+    flushMetricMessageQueueToMaster();
+    persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
+        ControlMessage.Message.newBuilder()
+            .setId(RuntimeIdGenerator.generateMessageId())
+            .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+            .setType(ControlMessage.MessageType.MetricFlushed)
+            .build());
+  }
+
   private synchronized void flushMetricMessageQueueToMaster() {
     if (!metricMessageQueue.isEmpty()) {
       // Build batched metric messages
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
index 1d433dbb..dcd51559 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
@@ -23,7 +23,20 @@
 @DefaultImplementation(MetricManagerWorker.class)
 public interface MetricMessageSender extends AutoCloseable {
 
+  /**
+   * Send metric to master.
+   * @param metricKey key of the metric
+   * @param metricValue value of the metric
+   */
   void send(final String metricKey, final String metricValue);
 
+  /**
+   * Flush all metric inside of the queue.
+   */
+  void flush();
+
+  /**
+   * Flush the metric queue and close the metric dispatch.
+   */
   void close();
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c2919a32..df9454a9 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -43,11 +43,8 @@
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
-import edu.snu.nemo.runtime.master.ClientRPC;
-import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import edu.snu.nemo.runtime.master.*;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import edu.snu.nemo.runtime.master.RuntimeMaster;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.scheduler.*;
@@ -91,7 +88,7 @@
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class,
-    SourceVertex.class, ClientRPC.class})
+    SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
   private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
@@ -139,11 +136,12 @@ public void setUp() throws InjectionException {
         schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
     final ClientRPC clientRPC = mock(ClientRPC.class);
+    final MetricManagerMaster metricManagerMaster = mock(MetricManagerMaster.class);
 
     // Necessary for wiring up the message environments
     final RuntimeMaster runtimeMaster =
         new RuntimeMaster(scheduler, containerManager, master,
-            metricMessageHandler, messageEnvironment, clientRPC, EMPTY_DAG_DIRECTORY);
+            metricMessageHandler, messageEnvironment, clientRPC, metricManagerMaster, EMPTY_DAG_DIRECTORY);
 
     final Injector injector1 = Tang.Factory.getTang().newInjector();
     injector1.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
index c1571efb..5b6a8fdc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
@@ -17,6 +17,10 @@
 
 import javax.inject.Inject;
 
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,11 +39,24 @@
   private static final Logger LOG = LoggerFactory.getLogger(MetricManagerMaster.class.getName());
   private final Map<String, List<String>> compUnitIdToMetricInJson;
   private boolean isTerminated;
+  private final ExecutorRegistry executorRegistry;
 
   @Inject
-  private MetricManagerMaster() {
+  private MetricManagerMaster(final ExecutorRegistry executorRegistry) {
     this.compUnitIdToMetricInJson = new HashMap<>();
     this.isTerminated = false;
+    this.executorRegistry = executorRegistry;
+  }
+
+  public synchronized void sendMetricFlushRequest() {
+    executorRegistry.viewExecutors(executors -> executors.forEach(executor -> {
+      final ControlMessage.Message message = ControlMessage.Message.newBuilder()
+          .setId(RuntimeIdGenerator.generateMessageId())
+          .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+          .setType(ControlMessage.MessageType.RequestMetricFlush)
+          .build();
+      executor.sendControlMessage(message);
+    }));
   }
 
   @Override
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 61ffd203..6d409d8b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -69,6 +69,7 @@
 public final class RuntimeMaster {
   private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
   private static final int DAG_LOGGING_PERIOD = 3000;
+  private static final int METRIC_ARRIVE_TIMEOUT = 10000;
 
   private final ExecutorService runtimeMasterThread;
 
@@ -79,6 +80,7 @@
   private final MessageEnvironment masterMessageEnvironment;
   private final Map<Integer, Long> aggregatedMetricData;
   private final ClientRPC clientRPC;
+  private final MetricManagerMaster metricManagerMaster;
 
   // For converting json data. This is a thread safe.
   private final ObjectMapper objectMapper;
@@ -88,6 +90,7 @@
 
   private final AtomicInteger resourceRequestCount;
 
+  private CountDownLatch metricCountDownLatch;
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
@@ -96,6 +99,7 @@ public RuntimeMaster(final Scheduler scheduler,
                        final MetricMessageHandler metricMessageHandler,
                        final MessageEnvironment masterMessageEnvironment,
                        final ClientRPC clientRPC,
+                       final MetricManagerMaster metricManagerMaster,
                        @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
     // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
@@ -110,6 +114,7 @@ public RuntimeMaster(final Scheduler scheduler,
     this.masterMessageEnvironment
         .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
     this.clientRPC = clientRPC;
+    this.metricManagerMaster = metricManagerMaster;
     this.dagDirectory = dagDirectory;
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
@@ -145,6 +150,17 @@ public RuntimeMaster(final Scheduler scheduler,
   }
 
   public void terminate() {
+    // send metric flush request to all executors
+    metricManagerMaster.sendMetricFlushRequest();
+    try {
+      // wait for metric flush
+      if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, TimeUnit.MILLISECONDS)) {
+        LOG.warn("Terminating master before all executor terminated messages arrived.");
+      }
+    } catch (final InterruptedException e) {
+      LOG.warn("Waiting executor terminating process interrupted.");
+    }
+
     runtimeMasterThread.execute(() -> {
 
       scheduler.terminate();
@@ -176,6 +192,7 @@ public void requestContainer(final String resourceSpecificationString) {
           resourceRequestCount.getAndAdd(executorNum);
           containerManager.requestContainer(executorNum, builder.build());
         }
+        metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
       } catch (final Exception e) {
         throw new ContainerException(e);
       }
@@ -237,6 +254,7 @@ public boolean onExecutorLaunched(final ActiveContext activeContext) {
   public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
     runtimeMasterThread.execute(() -> {
       LOG.info("onExecutorFailed: {}", failedEvaluator.getId());
+      metricCountDownLatch.countDown();
 
       // Note that getFailedContextList() can be empty if the failure occurred
       // prior to launching an Executor on the Evaluator.
@@ -310,6 +328,9 @@ private void handleControlMessage(final ControlMessage.Message message) {
             .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
             .build());
         break;
+      case MetricFlushed:
+        metricCountDownLatch.countDown();
+        break;
       default:
         throw new IllegalMessageException(
             new Exception("This message should not be received by Master :" + message.getType()));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
index 343151f0..8f052f0e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -59,7 +59,7 @@ synchronized void registerExecutor(final ExecutorRepresenter executor) {
     }
   }
 
-  synchronized void viewExecutors(final Consumer<Set<ExecutorRepresenter>> consumer) {
+  public synchronized void viewExecutors(final Consumer<Set<ExecutorRepresenter>> consumer) {
     consumer.accept(getRunningExecutors());
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/runtime/master/MetricFlushTest.java b/tests/src/test/java/edu/snu/nemo/runtime/master/MetricFlushTest.java
new file mode 100644
index 00000000..75f964ff
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/runtime/master/MetricFlushTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageContext;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.common.message.MessageListener;
+import edu.snu.nemo.runtime.common.message.MessageSender;
+import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
+import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
+import edu.snu.nemo.runtime.executor.MetricManagerWorker;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Ensures metrics collected by {@link edu.snu.nemo.runtime.executor.MetricManagerWorker} are properly sent to master
+ * before the job finishes.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, ExecutorRegistry.class})
+public final class MetricFlushTest {
+  private static final Tang TANG = Tang.Factory.getTang();
+  private static final String MASTER = "MASTER";
+  private static final String WORKER = "WORKER";
+  private static final int EXECUTOR_NUM = 5;
+  private static MessageSender masterToWorkerSender;
+
+  @Test(timeout = 10000)
+  public void test() throws InjectionException, ExecutionException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(EXECUTOR_NUM);
+
+    final LocalMessageDispatcher localMessagedispatcher = new LocalMessageDispatcher();
+
+    final Configuration configuration = TANG.newConfigurationBuilder()
+        .build();
+    final Injector injector = TANG.newInjector(configuration);
+
+    final Injector masterInjector = injector.forkInjector();
+    final Injector workerInjector = injector.forkInjector();
+
+    final LocalMessageEnvironment masterMessageEnvironment = new LocalMessageEnvironment(MASTER,
+        localMessagedispatcher);
+    masterInjector.bindVolatileInstance(MessageEnvironment.class, masterMessageEnvironment);
+
+    final LocalMessageEnvironment workerMessageEnvironment = new LocalMessageEnvironment(WORKER,
+        localMessagedispatcher);
+    workerInjector.bindVolatileInstance(MessageEnvironment.class, workerMessageEnvironment);
+
+    masterToWorkerSender = masterMessageEnvironment
+        .asyncConnect(WORKER, MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID).get();
+
+    final Set<ExecutorRepresenter> executorRepresenterSet = new HashSet<>();
+
+    for (int i = 0; i < EXECUTOR_NUM; i++) {
+      executorRepresenterSet.add(newWorker());
+    }
+
+    final ExecutorRegistry executorRegistry = mock(ExecutorRegistry.class);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      final Consumer<Set<ExecutorRepresenter>> consumer = (Consumer) invocationOnMock.getArguments()[0];
+      consumer.accept(executorRepresenterSet);
+      return null;
+    }).when(executorRegistry).viewExecutors(any());
+
+    masterInjector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry);
+
+    final MetricManagerMaster metricManagerMaster = masterInjector.getInstance(MetricManagerMaster.class);
+    final MetricManagerWorker metricManagerWorker = workerInjector.getInstance(MetricManagerWorker.class);
+
+    masterMessageEnvironment.setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID,
+        new MessageListener<Object>() {
+        @Override
+        public void onMessage(Object message) {
+          latch.countDown();
+        }
+
+        @Override
+        public void onMessageWithContext(Object message, MessageContext messageContext) {
+        }
+    });
+
+    workerMessageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID,
+        new MessageListener<Object>() {
+          @Override
+          public void onMessage(Object message) {
+            metricManagerWorker.flush();
+          }
+
+          @Override
+          public void onMessageWithContext(Object message, MessageContext messageContext) {
+          }
+        });
+
+    metricManagerMaster.sendMetricFlushRequest();
+
+    latch.await();
+  }
+
+  private ExecutorRepresenter newWorker() {
+    final ExecutorRepresenter workerRepresenter = mock(ExecutorRepresenter.class);
+    doAnswer((Answer<Void>) invocationOnMock -> {
+      final ControlMessage.Message msg = (ControlMessage.Message) invocationOnMock.getArguments()[0];
+      masterToWorkerSender.send(msg);
+      return null;
+    }).when(workerRepresenter).sendControlMessage(any());
+    return workerRepresenter;
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services