You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/22 00:26:31 UTC
[incubator-nemo] branch master updated: [NEMO-300] Fix starvation
when handling multiple pending data fetchers #161
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 79c5e05 [NEMO-300] Fix starvation when handling multiple pending data fetchers #161
79c5e05 is described below
commit 79c5e05ba8643aac09978351443c1adfbaddaccb
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Thu Nov 22 09:26:27 2018 +0900
[NEMO-300] Fix starvation when handling multiple pending data fetchers #161
JIRA: [NEMO-300: Fix starvation when handling multiple pending data fetchers](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-300)
**Major changes:**
- Fix polling logic of pending data fetchers to iterate all pending data fetchers
---
.../nemo/runtime/executor/task/TaskExecutor.java | 45 ++++++++++++----------
1 file changed, 24 insertions(+), 21 deletions(-)
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index b08b12a..af31979 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -461,31 +461,34 @@ public final class TaskExecutor {
final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
final long currentTime = System.currentTimeMillis();
- // We check pending data every polling interval
- while (pendingIterator.hasNext()
- && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+
+
+ if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+ // We check pending data every polling interval
prevPollingTime = currentTime;
- final DataFetcher dataFetcher = pendingIterator.next();
- try {
- final Object element = dataFetcher.fetchDataElement();
- onEventFromDataFetcher(element, dataFetcher);
+ while (pendingIterator.hasNext()) {
+ final DataFetcher dataFetcher = pendingIterator.next();
+ try {
+ final Object element = dataFetcher.fetchDataElement();
+ onEventFromDataFetcher(element, dataFetcher);
+
+ // We processed data. This means the data fetcher is now available.
+ // Add current data fetcher to available
+ pendingIterator.remove();
+ if (!(element instanceof Finishmark)) {
+ availableFetchers.add(dataFetcher);
+ }
- // We processed data. This means the data fetcher is now available.
- // Add current data fetcher to available
- pendingIterator.remove();
- if (!(element instanceof Finishmark)) {
- availableFetchers.add(dataFetcher);
+ } catch (final NoSuchElementException e) {
+ // The current data fetcher is still pending.. try next data fetcher
+ } catch (final IOException e) {
+ // IOException means that this task should be retried.
+ taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+ Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+ LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
+ return false;
}
-
- } catch (final NoSuchElementException e) {
- // The current data fetcher is still pending.. try next data fetcher
- } catch (final IOException e) {
- // IOException means that this task should be retried.
- taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
- Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
- LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
- return false;
}
}