You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2017/01/16 04:16:01 UTC

tez git commit: TEZ-3574. Container reuse won't pickup extra dag level local resource. Contributed by Zhiyuan Yang.

Repository: tez
Updated Branches:
  refs/heads/master 81dacf58e -> abb350c0c


TEZ-3574. Container reuse won't pickup extra dag level local resource.
Contributed by Zhiyuan Yang.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/abb350c0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/abb350c0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/abb350c0

Branch: refs/heads/master
Commit: abb350c0c1b397fe05e0de403d706e10044af398
Parents: 81dacf5
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Jan 15 20:15:38 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Jan 15 20:15:38 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../main/java/org/apache/tez/dag/api/DAG.java   | 15 +++++
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 25 ++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  1 +
 .../app/rm/container/AMContainerHelpers.java    | 16 ++---
 .../dag/app/rm/container/AMContainerImpl.java   |  4 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 65 ++++++++++++++++++++
 7 files changed, 113 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f275350..cfc5214 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3574. Container reuse won't pickup extra dag level local resource.
   TEZ-3443. Remove a repeated/unused method from MRTask.
   TEZ-3551. FrameworkClient created twice causing minor delay.
   TEZ-3566. Avoid caching fs isntances in TokenCache after a point.
@@ -168,6 +169,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3574. Container reuse won't pickup extra dag level local resource.
   TEZ-3566. Avoid caching fs isntances in TokenCache after a point.
   TEZ-3568. Update SecurityUtils configuration to pick user provided configuration.
   TEZ-3559. TEZ_LIB_URIS doesn't work with schemes different than the defaultFS

http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index f15c1fb..c136811 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -685,6 +685,21 @@ public class DAG {
       }
     }
 
+    // check for conflicts between dag level local resource and vertex level local resource
+    for (Vertex v : vertices.values()) {
+      for (Map.Entry<String, LocalResource> localResource : v.getTaskLocalFiles().entrySet()) {
+        String resourceName = localResource.getKey();
+        LocalResource resource = localResource.getValue();
+        if (commonTaskLocalFiles.containsKey(resourceName)
+          && !commonTaskLocalFiles.get(resourceName).equals(resource)) {
+          throw new IllegalStateException("There is conflicting local resource (" + resourceName
+            + ") between dag local resource and vertex " + v.getName() + " local resource. "
+            + "\nResource of dag : " + commonTaskLocalFiles.get(resourceName)
+            + "\nResource of vertex: " + resource);
+        }
+      }
+    }
+
     return topologicalVertexStack;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 794a597..5706542 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1166,4 +1167,28 @@ public class TestDAGVerify {
     dag.verify();
   }
 
+  @Test(timeout = 5000)
+  public void testDAGWithConflictingResource() {
+    DAG dag = DAG.create("dag");
+    Map<String, LocalResource> localResourceMap = new HashMap<>();
+    String commonResourceKey = "local resource";
+    localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE,
+      LocalResourceVisibility.APPLICATION, 0, 0));
+    dag.addTaskLocalFiles(localResourceMap);
+
+    Vertex v1 = Vertex.create("v", ProcessorDescriptor.create(dummyProcessorClassName), 1);
+    // same key but different resource
+    localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE,
+      LocalResourceVisibility.APPLICATION, 10, 0));
+    v1.addTaskLocalFiles(localResourceMap);
+
+    dag.addVertex(v1);
+
+    try {
+      dag.verify();
+      Assert.fail("should report failure on conflict resources");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains("There is conflicting local resource"));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bf291b7..4cda98d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -906,6 +906,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     this.localResources = DagTypeConverters
         .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getLocalResourceList());
+    this.localResources.putAll(dag.getLocalResources());
     this.environment = DagTypeConverters
         .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getEnvironmentSettingList());

http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 11b5006..3bac7b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -94,7 +94,7 @@ public class AMContainerHelpers {
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
-      Credentials credentials, Map<String, LocalResource> localResources) {
+      Credentials credentials) {
 
     // Application environment
     Map<String, String> environment = new HashMap<String, String>();
@@ -137,7 +137,7 @@ public class AMContainerHelpers {
     // The null fields are per-container and will be constructed for each
     // container separately.
     ContainerLaunchContext container =
-        ContainerLaunchContext.newInstance(localResources, environment, null,
+        ContainerLaunchContext.newInstance(null, environment, null,
             serviceData, containerCredentialsBuffer, applicationACLs);
     return container;
   }
@@ -145,7 +145,6 @@ public class AMContainerHelpers {
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
       TezDAGID tezDAGID,
-      Map<String, LocalResource> commonDAGLRs,
       Map<ApplicationAccessType, String> acls,
       ContainerId containerId,
       Map<String, LocalResource> localResources,
@@ -159,7 +158,7 @@ public class AMContainerHelpers {
     synchronized (commonContainerSpecLock) {
       if (!commonContainerSpecs.containsKey(tezDAGID)) {
         commonContainerSpec =
-            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs);
+            createCommonContainerLaunchContext(acls, credentials);
         commonContainerSpecs.put(tezDAGID, commonContainerSpec);
       } else {
         commonContainerSpec = commonContainerSpecs.get(tezDAGID);
@@ -175,13 +174,6 @@ public class AMContainerHelpers {
       }
     }
 
-    // Fill in the fields needed per-container that are missing in the common
-    // spec.
-    Map<String, LocalResource> lResources =
-        new TreeMap<String, LocalResource>();
-    lResources.putAll(commonContainerSpec.getLocalResources());
-    lResources.putAll(localResources);
-
     // Setup environment by cloning from common env.
     Map<String, String> env = commonContainerSpec.getEnvironment();
     Map<String, String> myEnv = new HashMap<String, String>(env.size());
@@ -214,7 +206,7 @@ public class AMContainerHelpers {
 
     // Construct the actual Container
     ContainerLaunchContext container =
-        ContainerLaunchContext.newInstance(lResources, myEnv, commands,
+        ContainerLaunchContext.newInstance(localResources, myEnv, commands,
             myServiceData, commonContainerSpec.getTokens().duplicate(), acls);
 
     return container;

http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 94c8fe0..5d73a7b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -460,10 +460,8 @@ public class AMContainerImpl implements AMContainer {
       container.credentialsChanged = true;
 
       TezDAGID dagId = null;
-      Map<String, LocalResource> dagLocalResources = null;
       if (container.appContext.getCurrentDAG() != null) {
         dagId = container.appContext.getCurrentDAG().getID();
-        dagLocalResources = container.appContext.getCurrentDAG().getLocalResources();
       }
 
       // TODO TEZ-2625 This should ideally be handled inside of user code. Will change once
@@ -489,7 +487,7 @@ public class AMContainerImpl implements AMContainer {
       }
 
       ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext(
-          dagId, dagLocalResources,
+          dagId,
           container.appContext.getApplicationACLs(),
           container.getContainerId(),
           containerContext.getLocalResources(),

http://git-wip-us.apache.org/repos/asf/tez/blob/abb350c0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a11311d..90d675e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -51,14 +51,24 @@ import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.tez.common.DrainDispatcher;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.dag.app.rm.AMSchedulerEvent;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.VertexStatistics;
@@ -254,6 +264,7 @@ public class TestVertexImpl {
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
+  private AMSchedulerEventDispatcher amSchedulerEventDispatcher;
   private HistoryEventHandler historyEventHandler;
   private StateChangeNotifierForTest updateTracker;
   private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
@@ -414,6 +425,14 @@ public class TestVertexImpl {
     }
   }
 
+  private class AMSchedulerEventDispatcher implements  EventHandler<AMSchedulerEvent> {
+    List<AMSchedulerEvent> events = new ArrayList<>();
+
+    public void handle(AMSchedulerEvent event) {
+      events.add(event);
+    }
+  }
+
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -1619,6 +1638,16 @@ public class TestVertexImpl {
     LOG.info("Setting up dag plan");
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testverteximpl")
+        .addLocalResource(
+          PlanLocalResource.newBuilder()
+            .setName("dag lr")
+            .setUri("dag ir uri")
+            .setSize(1)
+            .setTimeStamp(1)
+            .setType(PlanLocalResourceType.FILE)
+            .setVisibility(PlanLocalResourceVisibility.APPLICATION)
+            .build()
+        )
         .addVertex(
           VertexPlan.newBuilder()
             .setName("vertex1")
@@ -1636,6 +1665,16 @@ public class TestVertexImpl {
                 .setMemoryMb(1024)
                 .setJavaOpts("")
                 .setTaskModule("x1.y1")
+                .addLocalResource(
+                  PlanLocalResource.newBuilder()
+                    .setName("vertex lr")
+                    .setUri("vertex ir uri")
+                    .setSize(1)
+                    .setTimeStamp(1)
+                    .setType(PlanLocalResourceType.FILE)
+                    .setVisibility(PlanLocalResourceVisibility.APPLICATION)
+                    .build()
+                )
                 .build()
             )
             .addOutEdgeId("e1")
@@ -2474,6 +2513,13 @@ public class TestVertexImpl {
     DAG dag = mock(DAG.class);
     doReturn(ugi).when(dag).getDagUGI();
     doReturn(dagName).when(dag).getName();
+    Map<String, LocalResource> localResources = new HashMap<>();
+    for (PlanLocalResource planLR : dagPlan.getLocalResourceList()) {
+      localResources.put(planLR.getName(),
+        DagTypeConverters.convertPlanLocalResourceToLocalResource(planLR));
+    }
+    when(dag.getLocalResources()).thenReturn(localResources);
+
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID();
     doReturn(dag).when(appContext).getCurrentDAG();
@@ -2559,6 +2605,8 @@ public class TestVertexImpl {
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    amSchedulerEventDispatcher = new AMSchedulerEventDispatcher();
+    dispatcher.register(AMSchedulerEventType.class, amSchedulerEventDispatcher);
     dispatcher.init(conf);
     dispatcher.start();
   }
@@ -7057,4 +7105,21 @@ public class TestVertexImpl {
     Assert.assertTrue(v.getLastTaskFinishTime() > 0);
   }
 
+  @Test(timeout = 5000)
+  public void testPickupDagLocalResourceOnScheduleTask() {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(v1);
+
+    TezTaskAttemptID taskAttemptId0 = TezTaskAttemptID.getInstance(v1.getTask(0).getTaskId(), 0);
+    TaskAttemptImpl ta0 = (TaskAttemptImpl) v1.getTask(0).getAttempt(taskAttemptId0);
+    ta0.handle(new TaskAttemptEventSchedule(taskAttemptId0, 1, 1));
+
+    dispatcher.await();
+    Assert.assertEquals(1, amSchedulerEventDispatcher.events.size());
+    AMSchedulerEventTALaunchRequest launchRequestEvent = (AMSchedulerEventTALaunchRequest) amSchedulerEventDispatcher.events.get(0);
+    Map<String, LocalResource> localResourceMap = launchRequestEvent.getContainerContext().getLocalResources();
+    Assert.assertTrue(localResourceMap.containsKey("dag lr"));
+    Assert.assertTrue(localResourceMap.containsKey("vertex lr"));
+  }
 }