You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by GitBox <gi...@apache.org> on 2021/10/20 05:11:24 UTC

[GitHub] [tez] rbalamohan commented on a change in pull request #152: TEZ-4338: Tez should consider node information to realize OUTPUT_LOST as early as possible - upstream(mapper) problems

rbalamohan commented on a change in pull request #152:
URL: https://github.com/apache/tez/pull/152#discussion_r732419364



##########
File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptE
   MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
     @Override
-    public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+    public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
         TaskAttemptEvent event) {
       TaskAttemptEventOutputFailed outputFailedEvent = 
           (TaskAttemptEventOutputFailed) event;
-      TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
-      TezTaskAttemptID failedDestTaId = tezEvent.getSourceInfo().getTaskAttemptID();
-      InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)tezEvent.getEvent();
+      TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+      TezTaskAttemptID failedDestTaId = inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+      InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)inputFailedEvent.getEvent();
       int failedInputIndexOnDestTa = readErrorEvent.getIndex();
-      if (readErrorEvent.getVersion() != attempt.getID().getId()) {
-        throw new TezUncheckedException(attempt.getID()
+
+      if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+        throw new TezUncheckedException(sourceAttempt.getID()
             + " incorrectly blamed for read error from " + failedDestTaId
             + " at inputIndex " + failedInputIndexOnDestTa + " version"
             + readErrorEvent.getVersion());
       }
-      LOG.info(attempt.getID()
-            + " blamed for read error from " + failedDestTaId
-            + " at inputIndex " + failedInputIndexOnDestTa);
-      long time = attempt.clock.getTime();
-      Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId);
+      // source host: where the data input is supposed to come from
+      String sHost = sourceAttempt.getNodeId().getHost();
+      // destination: where the data is tried to be fetched to
+      String dHost = readErrorEvent.getDestinationLocalhostName();
+
+      LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex {}", sourceAttempt.getID(),
+          sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+      boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+      Map<String, Set<String>> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts();
+      if (!downstreamBlamingHosts.containsKey(sHost)) {
+        LOG.info("Host {} is blamed for fetch failure from {} for the first time", sHost, dHost);
+        downstreamBlamingHosts.put(sHost, new HashSet<String>());
+      }
 
+      downstreamBlamingHosts.get(sHost).add(dHost);
+      int currentNumberOfFailingDownstreamHosts = downstreamBlamingHosts.get(sHost).size();
+      if (currentNumberOfFailingDownstreamHosts > sourceAttempt.getVertex().getVertexConfig()
+          .getMaxAllowedDownstreamHostsReportingFetchFailure()) {
+        LOG.info("Host will be marked fail: {} because of {} distinct upstream hosts having fetch failures", sHost,
+            currentNumberOfFailingDownstreamHosts);
+        tooManyDownstreamHostsBlamedTheSameUpstreamHost = true;
+      }
+
+      long time = sourceAttempt.clock.getTime();
+
+      Long firstErrReportTime = sourceAttempt.uniquefailedOutputReports.get(failedDestTaId);
       if (firstErrReportTime == null) {
-        attempt.uniquefailedOutputReports.put(failedDestTaId, time);
+        sourceAttempt.uniquefailedOutputReports.put(failedDestTaId, time);
         firstErrReportTime = time;
       }
 
-      int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
+      int maxAllowedOutputFailures = sourceAttempt.getVertex().getVertexConfig()

Review comment:
       rest of the refactorings and additional code to pass the hostname in readerror event etc looks fine.

##########
File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
##########
@@ -1793,80 +1793,107 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptE
   MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
     @Override
-    public TaskAttemptStateInternal transition(TaskAttemptImpl attempt,
+    public TaskAttemptStateInternal transition(TaskAttemptImpl sourceAttempt,
         TaskAttemptEvent event) {
       TaskAttemptEventOutputFailed outputFailedEvent = 
           (TaskAttemptEventOutputFailed) event;
-      TezEvent tezEvent = outputFailedEvent.getInputFailedEvent();
-      TezTaskAttemptID failedDestTaId = tezEvent.getSourceInfo().getTaskAttemptID();
-      InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)tezEvent.getEvent();
+      TezEvent inputFailedEvent = outputFailedEvent.getInputFailedEvent();
+      TezTaskAttemptID failedDestTaId = inputFailedEvent.getSourceInfo().getTaskAttemptID();
+
+      InputReadErrorEvent readErrorEvent = (InputReadErrorEvent)inputFailedEvent.getEvent();
       int failedInputIndexOnDestTa = readErrorEvent.getIndex();
-      if (readErrorEvent.getVersion() != attempt.getID().getId()) {
-        throw new TezUncheckedException(attempt.getID()
+
+      if (readErrorEvent.getVersion() != sourceAttempt.getID().getId()) {
+        throw new TezUncheckedException(sourceAttempt.getID()
             + " incorrectly blamed for read error from " + failedDestTaId
             + " at inputIndex " + failedInputIndexOnDestTa + " version"
             + readErrorEvent.getVersion());
       }
-      LOG.info(attempt.getID()
-            + " blamed for read error from " + failedDestTaId
-            + " at inputIndex " + failedInputIndexOnDestTa);
-      long time = attempt.clock.getTime();
-      Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId);
+      // source host: where the data input is supposed to come from
+      String sHost = sourceAttempt.getNodeId().getHost();
+      // destination: where the data is tried to be fetched to
+      String dHost = readErrorEvent.getDestinationLocalhostName();
+
+      LOG.info("{} (on {}) blamed for read error from {} (on {}) at inputIndex {}", sourceAttempt.getID(),
+          sHost, failedDestTaId, dHost, failedInputIndexOnDestTa);
+
+      boolean tooManyDownstreamHostsBlamedTheSameUpstreamHost = false;
+      Map<String, Set<String>> downstreamBlamingHosts = sourceAttempt.getVertex().getDownstreamBlamingHosts();

Review comment:
       Wouldn't downstreamBlamingHosts structure occupy lot of memory in AM in large clusters? E.g think of a job running at 1000 node scale and running with 10 vertices. This will easily have 1000 tasks x 1000 nodes entries, which can lead to mem pressure on AM. Add number of vertices to this and it will be even more mem pressure.
   
   How about tracking the downstream hostnames in a set and use that as optional dimension in the computation? This way, even if multiple task attempts were running the same host, it will be accounted as single failure (note that currently in master branch, it is accounted as multiple times). 
   
   To be more specific, is it possible to track downstream hostnames in a set and use that set.size() for computing the fraction to determine if src has to be re-executed or not?

##########
File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
##########
@@ -300,6 +300,21 @@ public TezConfiguration(boolean loadDefaults) {
       TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error";
   public static final int TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300;
 
+  /**
+   * int value. The maximum number of distinct downstream hosts that can report a fetch failure
+   * for a single upstream host before the upstream task attempt is marked as failed (so blamed for
+   * the fetch failure). E.g. if this set to 1, in case of 2 different hosts reporting fetch failure
+   * for the same upstream host the upstream task is immediately blamed for the fetch failure.
+   * TODO: could this be proportional to the number of hosts running consumer/downstream tasks ?
+   *
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_MAX_ALLOWED_DOWNSTREAM_HOSTS_REPORTING_FETCH_FAILURE =

Review comment:
       How do we determine this value for large cluster or dynamically changing cluster?

##########
File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
##########
@@ -264,6 +264,13 @@
 
   final ServicePluginInfo servicePluginInfo;
 
+  /*
+   * For every upstream host (as map keys) contains every unique downstream hostnames that reported INPUT_READ_ERROR.
+   * This map helps to decide if there is a problem with the host that produced the map outputs. There is an assumption
+   * that if multiple downstream hosts report input errors for the same upstream host, then it's likely that the output
+   * has to be blamed and needs to rerun.
+   */
+  private final Map<String, Set<String>> downstreamBlamingHosts = Maps.newHashMap();

Review comment:
       Please refer to previous comment on mem pressure in AM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@tez.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org