You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/05/12 08:39:13 UTC

[tez] branch master updated: TEZ-2672: Allow specifying a new payload for plugins when a new DAG starts (László Bodor reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a97643  TEZ-2672: Allow specifying a new payload for plugins when a new DAG starts (László Bodor reviewed by Rajesh Balamohan)
4a97643 is described below

commit 4a97643cdc021b71e2a8bc60ac4fa0f7f9940d0f
Author: László Bodor <bo...@gmail.com>
AuthorDate: Tue May 12 10:36:38 2020 +0200

    TEZ-2672: Allow specifying a new payload for plugins when a new DAG starts (László Bodor reviewed by Rajesh Balamohan)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../org/apache/tez/serviceplugins/api/DagInfo.java |  3 +++
 .../java/org/apache/tez/dag/app/AppContext.java    |  1 -
 .../tez/dag/app/TaskCommunicatorContextImpl.java   |  4 +++-
 .../main/java/org/apache/tez/dag/app/dag/DAG.java  |  4 ----
 .../api/TaskCommunicatorContext.java               |  1 -
 .../dag/app/TestTaskCommunicatorContextImpl.java   | 26 ++++++++++++++++++++++
 .../apache/tez/dag/helpers/DagInfoImplForTest.java |  6 +++++
 7 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
index 328cb62..b05fa8b 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.serviceplugins.api;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 
 import java.util.BitSet;
@@ -41,4 +42,6 @@ public interface DagInfo {
   int getTotalVertices();
 
   BitSet getVertexDescendants(int vertexIndex);
+
+  Configuration getConf();
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index b3d561a..4eb2ae2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index b09eac7..faa6fe1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.Objects;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import org.apache.tez.common.Preconditions;
 import com.google.common.collect.Iterables;
@@ -58,7 +59,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
   private final UserPayload userPayload;
 
-  private DAG dag;
+  @VisibleForTesting
+  DAG dag;
 
   public TaskCommunicatorContextImpl(AppContext appContext,
                                      TaskCommunicatorManager taskCommunicatorManager,
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 5c2eba1..280966e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -22,8 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -76,8 +74,6 @@ public interface DAG extends DagInfo {
   boolean isUber();
   String getUserName();
 
-  Configuration getConf();
-
   DAGPlan getJobPlan();
   DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions);
   DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions, long timeout)
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index b6f3a54..6741a36 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -228,5 +228,4 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase {
    * @return time when the current dag started executing
    */
   long getDagStartTime();
-
 }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
index 9f9150f..e73ccf0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestTaskCommunicatorContextImpl {
@@ -83,6 +85,30 @@ public class TestTaskCommunicatorContextImpl {
     taskCommContext1.containerAlive(containerId01);
     verify(tal, never()).containerAlive(containerId01);
     reset(tal);
+  }
+
+  @Test
+  public void testTaskCommContextReachesDAGConf() {
+    Configuration conf = new Configuration();
+    conf.set("dagkey", "dagvalue");
+
+    DAG dag = mock(DAG.class);
+    when(dag.getConf()).thenReturn(conf);
+
+    // TaskCommunicatorContextImpl.dag is present
+    TaskCommunicatorContextImpl commContext = new TaskCommunicatorContextImpl(null, null, null, 0);
+    commContext.dag = dag;
+
+    Assert.assertEquals("DAG config should be exposed via context.dag.getConf()",
+        commContext.getCurrentDagInfo().getConf().get("dagkey"), "dagvalue");
+
+    // TaskCommunicatorContextImpl.appContext.getCurrentDAG() is present
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentDAG()).thenReturn(dag);
+    commContext = new TaskCommunicatorContextImpl(appContext, null, null, 0);
 
+    Assert.assertEquals(
+        "DAG config should be exposed via context.appContext.getCurrentDAG().getConf()",
+        commContext.getCurrentDagInfo().getConf().get("dagkey"), "dagvalue");
   }
 }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
index ab446ac..26a1a0b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
@@ -14,6 +14,7 @@
 
 package org.apache.tez.dag.helpers;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.tez.serviceplugins.api.DagInfo;
 
@@ -53,4 +54,9 @@ public class DagInfoImplForTest implements DagInfo {
   public BitSet getVertexDescendants(int vertexIndex) {
     return null;
   }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
 }