You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2017/01/16 04:16:01 UTC
tez git commit: TEZ-3574. Container reuse won't pickup extra dag
level local resource. Contributed by Zhiyuan Yang.
Repository: tez
Updated Branches:
refs/heads/master 81dacf58e -> abb350c0c
TEZ-3574. Container reuse won't pickup extra dag level local resource.
Contributed by Zhiyuan Yang.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/abb350c0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/abb350c0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/abb350c0
Branch: refs/heads/master
Commit: abb350c0c1b397fe05e0de403d706e10044af398
Parents: 81dacf5
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Jan 15 20:15:38 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Jan 15 20:15:38 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../main/java/org/apache/tez/dag/api/DAG.java | 15 +++++
.../org/apache/tez/dag/api/TestDAGVerify.java | 25 ++++++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 +
.../app/rm/container/AMContainerHelpers.java | 16 ++---
.../dag/app/rm/container/AMContainerImpl.java | 4 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 65 ++++++++++++++++++++
7 files changed, 113 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f275350..cfc5214 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3574. Container reuse won't pickup extra dag level local resource.
TEZ-3443. Remove a repeated/unused method from MRTask.
TEZ-3551. FrameworkClient created twice causing minor delay.
TEZ-3566. Avoid caching fs isntances in TokenCache after a point.
@@ -168,6 +169,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3574. Container reuse won't pickup extra dag level local resource.
TEZ-3566. Avoid caching fs isntances in TokenCache after a point.
TEZ-3568. Update SecurityUtils configuration to pick user provided configuration.
TEZ-3559. TEZ_LIB_URIS doesn't work with schemes different than the defaultFS
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index f15c1fb..c136811 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -685,6 +685,21 @@ public class DAG {
}
}
+ // check for conflicts between dag level local resource and vertex level local resource
+ for (Vertex v : vertices.values()) {
+ for (Map.Entry<String, LocalResource> localResource : v.getTaskLocalFiles().entrySet()) {
+ String resourceName = localResource.getKey();
+ LocalResource resource = localResource.getValue();
+ if (commonTaskLocalFiles.containsKey(resourceName)
+ && !commonTaskLocalFiles.get(resourceName).equals(resource)) {
+ throw new IllegalStateException("There is conflicting local resource (" + resourceName
+ + ") between dag local resource and vertex " + v.getName() + " local resource. "
+ + "\nResource of dag : " + commonTaskLocalFiles.get(resourceName)
+ + "\nResource of vertex: " + resource);
+ }
+ }
+ }
+
return topologicalVertexStack;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 794a597..5706542 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1166,4 +1167,28 @@ public class TestDAGVerify {
dag.verify();
}
+ @Test(timeout = 5000)
+ public void testDAGWithConflictingResource() {
+ DAG dag = DAG.create("dag");
+ Map<String, LocalResource> localResourceMap = new HashMap<>();
+ String commonResourceKey = "local resource";
+ localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION, 0, 0));
+ dag.addTaskLocalFiles(localResourceMap);
+
+ Vertex v1 = Vertex.create("v", ProcessorDescriptor.create(dummyProcessorClassName), 1);
+ // same key but different resource
+ localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION, 10, 0));
+ v1.addTaskLocalFiles(localResourceMap);
+
+ dag.addVertex(v1);
+
+ try {
+ dag.verify();
+ Assert.fail("should report failure on conflict resources");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("There is conflicting local resource"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bf291b7..4cda98d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -906,6 +906,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.localResources = DagTypeConverters
.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
.getLocalResourceList());
+ this.localResources.putAll(dag.getLocalResources());
this.environment = DagTypeConverters
.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
.getEnvironmentSettingList());
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 11b5006..3bac7b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -94,7 +94,7 @@ public class AMContainerHelpers {
*/
private static ContainerLaunchContext createCommonContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
- Credentials credentials, Map<String, LocalResource> localResources) {
+ Credentials credentials) {
// Application environment
Map<String, String> environment = new HashMap<String, String>();
@@ -137,7 +137,7 @@ public class AMContainerHelpers {
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container =
- ContainerLaunchContext.newInstance(localResources, environment, null,
+ ContainerLaunchContext.newInstance(null, environment, null,
serviceData, containerCredentialsBuffer, applicationACLs);
return container;
}
@@ -145,7 +145,6 @@ public class AMContainerHelpers {
@VisibleForTesting
public static ContainerLaunchContext createContainerLaunchContext(
TezDAGID tezDAGID,
- Map<String, LocalResource> commonDAGLRs,
Map<ApplicationAccessType, String> acls,
ContainerId containerId,
Map<String, LocalResource> localResources,
@@ -159,7 +158,7 @@ public class AMContainerHelpers {
synchronized (commonContainerSpecLock) {
if (!commonContainerSpecs.containsKey(tezDAGID)) {
commonContainerSpec =
- createCommonContainerLaunchContext(acls, credentials, commonDAGLRs);
+ createCommonContainerLaunchContext(acls, credentials);
commonContainerSpecs.put(tezDAGID, commonContainerSpec);
} else {
commonContainerSpec = commonContainerSpecs.get(tezDAGID);
@@ -175,13 +174,6 @@ public class AMContainerHelpers {
}
}
- // Fill in the fields needed per-container that are missing in the common
- // spec.
- Map<String, LocalResource> lResources =
- new TreeMap<String, LocalResource>();
- lResources.putAll(commonContainerSpec.getLocalResources());
- lResources.putAll(localResources);
-
// Setup environment by cloning from common env.
Map<String, String> env = commonContainerSpec.getEnvironment();
Map<String, String> myEnv = new HashMap<String, String>(env.size());
@@ -214,7 +206,7 @@ public class AMContainerHelpers {
// Construct the actual Container
ContainerLaunchContext container =
- ContainerLaunchContext.newInstance(lResources, myEnv, commands,
+ ContainerLaunchContext.newInstance(localResources, myEnv, commands,
myServiceData, commonContainerSpec.getTokens().duplicate(), acls);
return container;
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 94c8fe0..5d73a7b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -460,10 +460,8 @@ public class AMContainerImpl implements AMContainer {
container.credentialsChanged = true;
TezDAGID dagId = null;
- Map<String, LocalResource> dagLocalResources = null;
if (container.appContext.getCurrentDAG() != null) {
dagId = container.appContext.getCurrentDAG().getID();
- dagLocalResources = container.appContext.getCurrentDAG().getLocalResources();
}
// TODO TEZ-2625 This should ideally be handled inside of user code. Will change once
@@ -489,7 +487,7 @@ public class AMContainerImpl implements AMContainer {
}
ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext(
- dagId, dagLocalResources,
+ dagId,
container.appContext.getApplicationACLs(),
container.getContainerId(),
containerContext.getLocalResources(),
http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a11311d..90d675e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -51,14 +51,24 @@ import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
+import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.dag.app.rm.AMSchedulerEvent;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.VertexStatistics;
@@ -254,6 +264,7 @@ public class TestVertexImpl {
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
+ private AMSchedulerEventDispatcher amSchedulerEventDispatcher;
private HistoryEventHandler historyEventHandler;
private StateChangeNotifierForTest updateTracker;
private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
@@ -414,6 +425,14 @@ public class TestVertexImpl {
}
}
+ private class AMSchedulerEventDispatcher implements EventHandler<AMSchedulerEvent> {
+ List<AMSchedulerEvent> events = new ArrayList<>();
+
+ public void handle(AMSchedulerEvent event) {
+ events.add(event);
+ }
+ }
+
private class VertexEventDispatcher
implements EventHandler<VertexEvent> {
@@ -1619,6 +1638,16 @@ public class TestVertexImpl {
LOG.info("Setting up dag plan");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
+ .addLocalResource(
+ PlanLocalResource.newBuilder()
+ .setName("dag lr")
+ .setUri("dag ir uri")
+ .setSize(1)
+ .setTimeStamp(1)
+ .setType(PlanLocalResourceType.FILE)
+ .setVisibility(PlanLocalResourceVisibility.APPLICATION)
+ .build()
+ )
.addVertex(
VertexPlan.newBuilder()
.setName("vertex1")
@@ -1636,6 +1665,16 @@ public class TestVertexImpl {
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x1.y1")
+ .addLocalResource(
+ PlanLocalResource.newBuilder()
+ .setName("vertex lr")
+ .setUri("vertex ir uri")
+ .setSize(1)
+ .setTimeStamp(1)
+ .setType(PlanLocalResourceType.FILE)
+ .setVisibility(PlanLocalResourceVisibility.APPLICATION)
+ .build()
+ )
.build()
)
.addOutEdgeId("e1")
@@ -2474,6 +2513,13 @@ public class TestVertexImpl {
DAG dag = mock(DAG.class);
doReturn(ugi).when(dag).getDagUGI();
doReturn(dagName).when(dag).getName();
+ Map<String, LocalResource> localResources = new HashMap<>();
+ for (PlanLocalResource planLR : dagPlan.getLocalResourceList()) {
+ localResources.put(planLR.getName(),
+ DagTypeConverters.convertPlanLocalResourceToLocalResource(planLR));
+ }
+ when(dag.getLocalResources()).thenReturn(localResources);
+
doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID();
doReturn(dag).when(appContext).getCurrentDAG();
@@ -2559,6 +2605,8 @@ public class TestVertexImpl {
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dagEventDispatcher = new DagEventDispatcher();
dispatcher.register(DAGEventType.class, dagEventDispatcher);
+ amSchedulerEventDispatcher = new AMSchedulerEventDispatcher();
+ dispatcher.register(AMSchedulerEventType.class, amSchedulerEventDispatcher);
dispatcher.init(conf);
dispatcher.start();
}
@@ -7057,4 +7105,21 @@ public class TestVertexImpl {
Assert.assertTrue(v.getLastTaskFinishTime() > 0);
}
+ @Test(timeout = 5000)
+ public void testPickupDagLocalResourceOnScheduleTask() {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v1 = vertices.get("vertex1");
+ startVertex(v1);
+
+ TezTaskAttemptID taskAttemptId0 = TezTaskAttemptID.getInstance(v1.getTask(0).getTaskId(), 0);
+ TaskAttemptImpl ta0 = (TaskAttemptImpl) v1.getTask(0).getAttempt(taskAttemptId0);
+ ta0.handle(new TaskAttemptEventSchedule(taskAttemptId0, 1, 1));
+
+ dispatcher.await();
+ Assert.assertEquals(1, amSchedulerEventDispatcher.events.size());
+ AMSchedulerEventTALaunchRequest launchRequestEvent = (AMSchedulerEventTALaunchRequest) amSchedulerEventDispatcher.events.get(0);
+ Map<String, LocalResource> localResourceMap = launchRequestEvent.getContainerContext().getLocalResources();
+ Assert.assertTrue(localResourceMap.containsKey("dag lr"));
+ Assert.assertTrue(localResourceMap.containsKey("vertex lr"));
+ }
}