You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/22 00:26:31 UTC

[incubator-nemo] branch master updated: [NEMO-300] Fix starvation when handling multiple pending data fetchers #161

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 79c5e05  [NEMO-300] Fix starvation when handling multiple pending data fetchers #161
79c5e05 is described below

commit 79c5e05ba8643aac09978351443c1adfbaddaccb
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Thu Nov 22 09:26:27 2018 +0900

    [NEMO-300] Fix starvation when handling multiple pending data fetchers #161
    
    JIRA: [NEMO-300: Fix starvation when handling multiple pending data fetchers](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-300)
    
    **Major changes:**
    - Fix polling logic of pending data fetchers to iterate all pending data fetchers
---
 .../nemo/runtime/executor/task/TaskExecutor.java   | 45 ++++++++++++----------
 1 file changed, 24 insertions(+), 21 deletions(-)

diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index b08b12a..af31979 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -461,31 +461,34 @@ public final class TaskExecutor {
 
       final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
       final long currentTime = System.currentTimeMillis();
-      // We check pending data every polling interval
-      while (pendingIterator.hasNext()
-        && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+
+
+      if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+        // We check pending data every polling interval
         prevPollingTime = currentTime;
 
-        final DataFetcher dataFetcher = pendingIterator.next();
-        try {
-          final Object element = dataFetcher.fetchDataElement();
-          onEventFromDataFetcher(element, dataFetcher);
+        while (pendingIterator.hasNext()) {
+          final DataFetcher dataFetcher = pendingIterator.next();
+          try {
+            final Object element = dataFetcher.fetchDataElement();
+            onEventFromDataFetcher(element, dataFetcher);
+
+            // We processed data. This means the data fetcher is now available.
+            // Add current data fetcher to available
+            pendingIterator.remove();
+            if (!(element instanceof Finishmark)) {
+              availableFetchers.add(dataFetcher);
+            }
 
-          // We processed data. This means the data fetcher is now available.
-          // Add current data fetcher to available
-          pendingIterator.remove();
-          if (!(element instanceof Finishmark)) {
-            availableFetchers.add(dataFetcher);
+          } catch (final NoSuchElementException e) {
+            // The current data fetcher is still pending.. try next data fetcher
+          } catch (final IOException e) {
+            // IOException means that this task should be retried.
+            taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+              Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+            LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
+            return false;
           }
-
-        } catch (final NoSuchElementException e) {
-          // The current data fetcher is still pending.. try next data fetcher
-        } catch (final IOException e) {
-          // IOException means that this task should be retried.
-          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
-            Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
-          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
-          return false;
         }
       }