You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2018/09/21 14:27:58 UTC
tez git commit: TEZ-3982. DAGAppMaster and tasks should not report
negative or invalid progress (Kuhu Shukla via jlowe)
Repository: tez
Updated Branches:
refs/heads/branch-0.9 29f654e2b -> 9f13abde3
TEZ-3982. DAGAppMaster and tasks should not report negative or invalid progress (Kuhu Shukla via jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9f13abde
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9f13abde
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9f13abde
Branch: refs/heads/branch-0.9
Commit: 9f13abde3efb238228d8493452884af3bf11e5b3
Parents: 29f654e
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Sep 21 09:26:48 2018 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Sep 21 09:26:48 2018 -0500
----------------------------------------------------------------------
.../org/apache/tez/common/ProgressHelper.java | 5 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 2 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 12 ++-
.../apache/tez/dag/app/TestDAGAppMaster.java | 82 ++++++++++++++++++++
4 files changed, 97 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
index 407a20e..07b066c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
@@ -47,7 +47,10 @@ public class ProgressHelper {
if (inputs != null && inputs.size() != 0) {
for (LogicalInput input : inputs.values()) {
if (input instanceof AbstractLogicalInput) {
- progSum += ((AbstractLogicalInput) input).getProgress();
+ float inputProgress = ((AbstractLogicalInput) input).getProgress();
+ if (inputProgress >= 0.0f && inputProgress <= 1.0f) {
+ progSum += inputProgress;
+ }
}
}
progress = (1.0f) * progSum / inputs.size();
http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c4b8df0..7ff47fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1271,7 +1271,7 @@ public class DAGAppMaster extends AbstractService {
}
public float getProgress() {
- if (isSession && state.equals(DAGAppMasterState.IDLE)) {
+ if (isSession && getState().equals(DAGAppMasterState.IDLE)) {
return 0.0f;
}
if(currentDAG != null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index ecd8d17..8cb39a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -804,9 +804,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
try {
float progress = 0.0f;
for (Vertex v : getVertices().values()) {
- progress += v.getProgress();
+ float vertexProgress = v.getProgress();
+ if (vertexProgress >= 0.0f && vertexProgress <= 1.0f) {
+ progress += vertexProgress;
+ }
+ }
+ float dagProgress = progress / getTotalVertices();
+ if (dagProgress >= 0.0f && progress <= 1.0f) {
+ return dagProgress;
+ } else {
+ return 0.0f;
}
- return progress / getTotalVertices();
} finally {
this.readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9f13abde/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 570c6dc..56c8a72 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -14,12 +14,17 @@
package org.apache.tez.dag.app;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezVertexID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -29,8 +34,10 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
@@ -387,6 +394,81 @@ public class TestDAGAppMaster {
testDagCredentials(true);
}
+ @Test
+ public void testBadProgress() throws Exception {
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ // create some sample AM credentials
+ Credentials amCreds = new Credentials();
+ JobTokenSecretManager jtsm = new JobTokenSecretManager();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(
+ new Text(appId.toString()));
+ Token<JobTokenIdentifier> sessionToken =
+ new Token<JobTokenIdentifier>(identifier, jtsm);
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, amCreds);
+ TestTokenSecretManager ttsm = new TestTokenSecretManager();
+ Text tokenAlias1 = new Text("alias1");
+ Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("amtoken1")), ttsm);
+ amCreds.addToken(tokenAlias1, amToken1);
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ FSDataOutputStream sessionJarsPBOutStream =
+ TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
+ TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+ DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
+ .writeDelimitedTo(sessionJarsPBOutStream);
+ sessionJarsPBOutStream.close();
+ DAGAppMaster am = spy(new DAGAppMaster(attemptId,
+ ContainerId.newContainerId(attemptId, 1),
+ "127.0.0.1", 0, 0, new SystemClock(), 1, true,
+ TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
+ new String[] {TEST_DIR.toString()},
+ new TezApiVersionInfo().getVersion(), amCreds,
+ "someuser", null));
+ when(am.getState()).thenReturn(DAGAppMasterState.RUNNING);
+ am.init(conf);
+ am.start();
+ Credentials dagCreds = new Credentials();
+ Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("dagtoken1")), ttsm);
+ dagCreds.addToken(tokenAlias1, dagToken1);
+ Text tokenAlias3 = new Text("alias3");
+ Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("dagtoken2")), ttsm);
+ dagCreds.addToken(tokenAlias3, dagToken2);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ DAGPlan dagPlan = DAGPlan.newBuilder()
+ .setName("somedag")
+ .setCredentialsBinary(
+ DagTypeConverters.convertCredentialsToProto(dagCreds))
+ .build();
+ DAGImpl dag = spy(am.createDAG(dagPlan, dagId));
+ am.setCurrentDAG(dag);
+ when(dag.getState()).thenReturn(DAGState.RUNNING);
+ Map<TezVertexID, Vertex> map = new HashMap<TezVertexID, Vertex>();
+ TezVertexID mockVertexID = mock(TezVertexID.class);
+ Vertex mockVertex = mock(Vertex.class);
+ when(mockVertex.getProgress()).thenReturn(Float.NaN);
+ map.put(mockVertexID, mockVertex);
+ when(dag.getVertices()).thenReturn(map);
+ when(dag.getTotalVertices()).thenReturn(1);
+ Assert.assertEquals("Progress was NaN and should be reported as 0",
+ 0, am.getProgress(), 0);
+ when(mockVertex.getProgress()).thenReturn(-10f);
+ Assert.assertEquals("Progress was negative and should be reported as 0",
+ 0, am.getProgress(), 0);
+ when(mockVertex.getProgress()).thenReturn(10f);
+ Assert.assertEquals("Progress was greater than 1 and should be reported as 0",
+ 0, am.getProgress(), 0);
+ }
+
@SuppressWarnings("deprecation")
private void testDagCredentials(boolean doMerge) throws IOException {
TezConfiguration conf = new TezConfiguration();