You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/02/24 10:04:20 UTC
[tez] branch master updated: TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 3e194cb56 TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor)
3e194cb56 is described below
commit 3e194cb56fa38099e0b9c650682398e8a0ef93a0
Author: mudit-97 <32...@users.noreply.github.com>
AuthorDate: Fri Feb 24 15:34:12 2023 +0530
TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor)
---
.../org/apache/tez/dag/api/TezConfiguration.java | 12 ++
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 26 +++-
.../org/apache/tez/dag/app/TestDAGAppMaster.java | 156 +++++++++++++++++----
3 files changed, 162 insertions(+), 32 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cd6d02249..6d7783624 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1828,6 +1828,18 @@ public class TezConfiguration extends Configuration {
TEZ_PREFIX + "dag.recovery.enabled";
public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
+
+ /**
+ * Boolean value. When set, this enables AM to fail when DAG recovery is enabled and
+ * restarted app master did not find anything to recover
+ * Expert level setting.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="boolean")
+ public static final String TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA =
+ TEZ_AM_PREFIX + "failure.on.missing.recovery.data";
+ public static final boolean TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT = false;
+
/**
* Int value. Size in bytes for the IO buffer size while processing the recovery file.
* Expert level setting.
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index a5d7b7db9..2ef72f8c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1916,10 +1916,7 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Recovering data from previous attempts"
+ ", currentAttemptId=" + this.appAttemptID.getAttemptId());
this.state = DAGAppMasterState.RECOVERING;
- RecoveryParser recoveryParser = new RecoveryParser(
- this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
- DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
- return recoveredDAGData;
+ return parseDAGFromRecoveryData();
}
} finally {
hadoopShim.clearHadoopCallerContext();
@@ -1928,6 +1925,27 @@ public class DAGAppMaster extends AbstractService {
return null;
}
+ private DAGRecoveryData parseDAGFromRecoveryData() throws IOException {
+ RecoveryParser recoveryParser = new RecoveryParser(
+ this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+ DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
+
+ /**
+ * Parsed recovery data can be NULL in scenarios where AM shutdown prematurely during the first attempt
+ * due to some FATAL error, if that happens recovery stream is not closed and no data is flushed on File System
+ * In cases like above, in next future attempts of application, recovery returns NULL instead of failing the DAG
+ * This config when enabled, throws an IOException for such cases, and it assumes that caller will catch these
+ * IOExceptions and will fail the DAG, which happens currently, JIRA: https://issues.apache.org/jira/browse/TEZ-4474
+ */
+ if(Objects.isNull(recoveredDAGData) && amConf.getBoolean(
+ TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
+ TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT)) {
+ throw new IOException(String.format("Found nothing to recover in currentAttemptId=%s from recovery data dir=%s",
+ this.appAttemptID.getAttemptId(), this.recoveryDataDir));
+ }
+ return recoveredDAGData;
+ }
+
@Override
public void serviceStart() throws Exception {
//start all the components
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index d8167dbcc..9fe8e3e72 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -14,54 +14,32 @@
package org.apache.tez.dag.app;
-import org.apache.hadoop.yarn.util.MonotonicClock;
-import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.records.TezVertexID;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.tez.common.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezApiVersionInfo;
+import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -78,13 +56,44 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestDAGAppMaster {
@@ -332,6 +341,97 @@ public class TestDAGAppMaster {
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
}
+ @Test(timeout = 60000)
+ public void testShutdownTezAMWithMissingRecoveryAndFailureOnMissingData() throws Exception {
+
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, true);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+
+ /*
+ Setting very high timeout because in case when TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA is set, it should
+ not time out, it should get shutdown earlier only without the timeout flow kicking in
+ */
+ conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1000000000);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
+
+ FileSystem mockFs = mock(FileSystem.class);
+ when(mockFs.exists(any())).thenReturn(false);
+
+ DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
+
+ dam.init(conf);
+ Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
+ field.setAccessible(true);
+ field.set(dam, mockFs);
+
+ dam.start();
+
+ ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
+ // This ensures that recovery data file system was called for getting summary files, and it will return false
+ verify(mockFs, times(2)).exists(captor.capture());
+
+ Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary"));
+ Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));
+
+ verify(dam.mockScheduler).setShouldUnregisterFlag();
+ verify(dam.mockShutdown).shutdown();
+
+ /*
+ * Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is set,
+ * DAG will be in ERRORed state if recovery was missing for attempts > 1
+ */
+ assertEquals(DAGAppMasterState.ERROR, dam.getState());
+ }
+
+ @Test
+ public void testShutdownTezAMWithMissingRecoveryAndNoFailureOnMissingData() throws Exception {
+
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, false);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+ conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
+
+ FileSystem mockFs = mock(FileSystem.class);
+ when(mockFs.exists(any())).thenReturn(false);
+
+ DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
+
+ dam.init(conf);
+ Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
+ field.setAccessible(true);
+ field.set(dam, mockFs);
+
+ dam.start();
+ // Waiting for session timeout interval to kick in, which is set to 1 s
+ Thread.sleep(2000);
+
+ ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
+ // This ensures that recovery data file system was called for getting summary files, and it will return false
+ verify(mockFs, times(2)).exists(captor.capture());
+
+ Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary"));
+ Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));
+
+ verify(dam.mockScheduler).setShouldUnregisterFlag();
+ verify(dam.mockShutdown).shutdown();
+
+ /*
+ * Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is unset,
+ * DAG will be in SUCCEEDED state if recovery was missing and timeout got triggered for attempts > 1
+ */
+ assertEquals(DAGAppMasterState.SUCCEEDED, dam.getState());
+ }
+
private void verifyDescAndMap(List<NamedEntityDescriptor> descriptors, BiMap<String, Integer> map,
int numExpected, boolean verifyPayload,
String... expectedNames) throws