You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/29 10:59:35 UTC

[GitHub] taegeonum commented on a change in pull request #130: [NEMO-233] Emit watermark at unbounded source

taegeonum commented on a change in pull request #130: [NEMO-233] Emit watermark at unbounded source 
URL: https://github.com/apache/incubator-nemo/pull/130#discussion_r228879918
 
 

 ##########
 File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 ##########
 @@ -303,46 +310,143 @@ private void finalizeVertex(final VertexHarness vertexHarness) {
   }
 
   /**
+   * Process an element generated from the dataFetcher.
+   * If the element is an instance of Finishmark, we remove the dataFetcher from the current list.
+   * @param element element
+   * @param dataFetcher current data fetcher
+   * @param dataFetchers current list
+   */
+  private void handleElement(final Object element,
+                             final DataFetcher dataFetcher,
+                             final List<DataFetcher> dataFetchers) {
+    if (element instanceof Finishmark) {
+      // We've consumed all the data from this data fetcher.
+      if (dataFetcher instanceof SourceVertexDataFetcher) {
+        boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
+      } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+        serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
+        encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
+      }
+
+      // remove current data fetcher from the list
+      dataFetchers.remove(dataFetcher);
+    } else if (element instanceof Watermark) {
+      // Watermark
+      processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+    } else {
+      // Process data element
+      processElement(dataFetcher.getOutputCollector(), element);
+    }
+  }
+
+  /**
+   * Check if it is time to poll pending fetchers' data.
+   * @param pollingPeriod polling period
+   * @param currentTime current time
+   * @param prevTime prev time
+   */
+  private boolean isPollingTime(final long pollingPeriod,
+                                final long currentTime,
+                                final long prevTime) {
+    return (currentTime - prevTime) >= pollingPeriod;
+  }
+
+  /**
+   * This retrieves data from data fetchers and process them.
+   * It maintains two lists:
+   *  -- availableFetchers: maintain data fetchers that currently have data elements to retreive
+   *  -- pendingFetchers: maintain data fetchers that currently do not have available elements.
+   *     This can become available in the future, and therefore we check the pending fetchers every pollingInterval.
+   *
+   *  If a data fetcher finishes, we remove it from the two lists.
+   *  If a data fetcher has no available element, we move the data fetcher to pendingFetchers
+   *  If a pending data fetcher has element, we move it to availableFetchers
+   *  If there are no available fetchers but pending fetchers, sleep for pollingPeriod
+   *  and retry fetching data from the pendingFetchers.
+   *
    * @param fetchers to handle.
    * @return false if IOException.
    */
   private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
-    final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers);
-    while (!availableFetchers.isEmpty()) { // empty means we've consumed all task-external input data
-      // For this looping of available fetchers.
-      int finishedFetcherIndex = NONE_FINISHED;
-      for (int i = 0; i < availableFetchers.size(); i++) {
-        final DataFetcher dataFetcher = availableFetchers.get(i);
-        final Object element;
+    final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers);
+    final List<DataFetcher> pendingFetchers = new LinkedList<>();
+
+    // Polling interval.
+    final long pollingInterval = 100; // ms
+
+    // Previous polling time
+    long prevPollingTime = System.currentTimeMillis();
+
+    // empty means we've consumed all task-external input data
+    while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) {
+      // We first fetch data from available data fetchers
+      final Iterator<DataFetcher> availableIterator = availableFetchers.iterator();
+
+      while (availableIterator.hasNext()) {
+        final DataFetcher dataFetcher = availableIterator.next();
         try {
-          element = dataFetcher.fetchDataElement();
-        } catch (NoSuchElementException e) {
-          // We've consumed all the data from this data fetcher.
-          if (dataFetcher instanceof SourceVertexDataFetcher) {
-            boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
-          } else if (dataFetcher instanceof ParentTaskDataFetcher) {
-            serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
-            encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
-          }
-          finishedFetcherIndex = i;
-          break;
-        } catch (IOException e) {
+          handleElement(dataFetcher.fetchDataElement(), dataFetcher, availableFetchers);
+        } catch (final NoSuchElementException e) {
+          // No element in current data fetcher, fetch data from next fetcher
+          // move current data fetcher to pending.
+          availableIterator.remove();
+          pendingFetchers.add(dataFetcher);
+        } catch (final IOException e) {
           // IOException means that this task should be retried.
           taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
             Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
           LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
           return false;
         }
+      }
+
+      final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
+      final long currentTime = System.currentTimeMillis();
+      // We check pending data every polling interval
+      while (pendingIterator.hasNext()
+        && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+        prevPollingTime = currentTime;
 
 Review comment:
   currentTime is updated at line 404. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services