You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/02/24 10:04:20 UTC

[tez] branch master updated: TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e194cb56 TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor)
3e194cb56 is described below

commit 3e194cb56fa38099e0b9c650682398e8a0ef93a0
Author: mudit-97 <32...@users.noreply.github.com>
AuthorDate: Fri Feb 24 15:34:12 2023 +0530

    TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor)
---
 .../org/apache/tez/dag/api/TezConfiguration.java   |  12 ++
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |  26 +++-
 .../org/apache/tez/dag/app/TestDAGAppMaster.java   | 156 +++++++++++++++++----
 3 files changed, 162 insertions(+), 32 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cd6d02249..6d7783624 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1828,6 +1828,18 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "dag.recovery.enabled";
   public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
 
+
+  /**
+   * Boolean value. When set, this enables AM to fail when DAG recovery is enabled and
+   * restarted app master did not find anything to recover
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA =
+          TEZ_AM_PREFIX + "failure.on.missing.recovery.data";
+  public static final boolean TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT = false;
+
   /**
    * Int value. Size in bytes for the IO buffer size while processing the recovery file.
    * Expert level setting.
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index a5d7b7db9..2ef72f8c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1916,10 +1916,7 @@ public class DAGAppMaster extends AbstractService {
           LOG.info("Recovering data from previous attempts"
               + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
           this.state = DAGAppMasterState.RECOVERING;
-          RecoveryParser recoveryParser = new RecoveryParser(
-              this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
-          DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
-          return recoveredDAGData;
+          return parseDAGFromRecoveryData();
         }
       } finally {
         hadoopShim.clearHadoopCallerContext();
@@ -1928,6 +1925,27 @@ public class DAGAppMaster extends AbstractService {
     return null;
   }
 
+  private DAGRecoveryData parseDAGFromRecoveryData() throws IOException {
+    RecoveryParser recoveryParser = new RecoveryParser(
+            this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+    DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
+
+    /**
+     * Parsed recovery data can be NULL in scenarios where AM shutdown prematurely during the first attempt
+     * due to some FATAL error, if that happens recovery stream is not closed and no data is flushed on File System
+     * In cases like above, in next future attempts of application, recovery returns NULL instead of failing the DAG
+     * This config when enabled, throws an IOException for such cases, and it assumes that caller will catch these
+     * IOExceptions and will fail the DAG, which happens currently, JIRA: https://issues.apache.org/jira/browse/TEZ-4474
+     */
+    if(Objects.isNull(recoveredDAGData) && amConf.getBoolean(
+            TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
+            TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT)) {
+      throw new IOException(String.format("Found nothing to recover in currentAttemptId=%s from recovery data dir=%s",
+              this.appAttemptID.getAttemptId(), this.recoveryDataDir));
+    }
+    return recoveredDAGData;
+  }
+
   @Override
   public void serviceStart() throws Exception {
     //start all the components
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index d8167dbcc..9fe8e3e72 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -14,54 +14,32 @@
 
 package org.apache.tez.dag.app;
 
-import org.apache.hadoop.yarn.util.MonotonicClock;
-import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.records.TezVertexID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.tez.common.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.TezApiVersionInfo;
+import org.apache.tez.common.Preconditions;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -78,13 +56,44 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestDAGAppMaster {
 
@@ -332,6 +341,97 @@ public class TestDAGAppMaster {
     assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
   }
 
+  @Test(timeout = 60000)
+  public void testShutdownTezAMWithMissingRecoveryAndFailureOnMissingData() throws Exception {
+
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+    conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, true);
+    conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+
+    /*
+      Setting very high timeout because in case when TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA is set, it should
+      not time out, it should get shutdown earlier only without the timeout flow kicking in
+     */
+    conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1000000000);
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
+
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.exists(any())).thenReturn(false);
+
+    DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
+
+    dam.init(conf);
+    Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
+    field.setAccessible(true);
+    field.set(dam, mockFs);
+
+    dam.start();
+
+    ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
+    // This ensures that recovery data file system was called for getting summary files, and it will return false
+    verify(mockFs, times(2)).exists(captor.capture());
+
+    Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary"));
+    Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));
+
+    verify(dam.mockScheduler).setShouldUnregisterFlag();
+    verify(dam.mockShutdown).shutdown();
+
+    /*
+     * Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is set,
+     * DAG will be in ERRORed state if recovery was missing for attempts > 1
+     */
+    assertEquals(DAGAppMasterState.ERROR, dam.getState());
+  }
+
+  @Test
+  public void testShutdownTezAMWithMissingRecoveryAndNoFailureOnMissingData() throws Exception {
+
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+    conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, false);
+    conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1);
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
+
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.exists(any())).thenReturn(false);
+
+    DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
+
+    dam.init(conf);
+    Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
+    field.setAccessible(true);
+    field.set(dam, mockFs);
+
+    dam.start();
+    // Waiting for session timeout interval to kick in, which is set to 1 s
+    Thread.sleep(2000);
+
+    ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
+    // This ensures that recovery data file system was called for getting summary files, and it will return false
+    verify(mockFs, times(2)).exists(captor.capture());
+
+    Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary"));
+    Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));
+
+    verify(dam.mockScheduler).setShouldUnregisterFlag();
+    verify(dam.mockShutdown).shutdown();
+
+    /*
+     * Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is unset,
+     * DAG will be in SUCCEEDED state if recovery was missing and timeout got triggered for attempts > 1
+     */
+    assertEquals(DAGAppMasterState.SUCCEEDED, dam.getState());
+  }
+
   private void verifyDescAndMap(List<NamedEntityDescriptor> descriptors, BiMap<String, Integer> map,
                                 int numExpected, boolean verifyPayload,
                                 String... expectedNames) throws