You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/05/12 08:39:13 UTC
[tez] branch master updated: TEZ-2672: Allow specifying a new payload for plugins when a new DAG starts (László Bodor reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 4a97643 TEZ-2672: Allow specifying a new payload for plugins when a new DAG starts (László Bodor reviewed by Rajesh Balamohan)
4a97643 is described below
commit 4a97643cdc021b71e2a8bc60ac4fa0f7f9940d0f
Author: László Bodor <bo...@gmail.com>
AuthorDate: Tue May 12 10:36:38 2020 +0200
TEZ-2672: Allow specifying a new payload for plugins when a new DAG starts (László Bodor reviewed by Rajesh Balamohan)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../org/apache/tez/serviceplugins/api/DagInfo.java | 3 +++
.../java/org/apache/tez/dag/app/AppContext.java | 1 -
.../tez/dag/app/TaskCommunicatorContextImpl.java | 4 +++-
.../main/java/org/apache/tez/dag/app/dag/DAG.java | 4 ----
.../api/TaskCommunicatorContext.java | 1 -
.../dag/app/TestTaskCommunicatorContextImpl.java | 26 ++++++++++++++++++++++
.../apache/tez/dag/helpers/DagInfoImplForTest.java | 6 +++++
7 files changed, 38 insertions(+), 7 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
index 328cb62..b05fa8b 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
@@ -14,6 +14,7 @@
package org.apache.tez.serviceplugins.api;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import java.util.BitSet;
@@ -41,4 +42,6 @@ public interface DagInfo {
int getTotalVertices();
BitSet getVertexDescendants(int vertexIndex);
+
+ Configuration getConf();
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index b3d561a..4eb2ae2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index b09eac7..faa6fe1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -21,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Objects;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Iterables;
@@ -58,7 +59,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
private final UserPayload userPayload;
- private DAG dag;
+ @VisibleForTesting
+ DAG dag;
public TaskCommunicatorContextImpl(AppContext appContext,
TaskCommunicatorManager taskCommunicatorManager,
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 5c2eba1..280966e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -22,8 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -76,8 +74,6 @@ public interface DAG extends DagInfo {
boolean isUber();
String getUserName();
- Configuration getConf();
-
DAGPlan getJobPlan();
DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions);
DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions, long timeout)
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index b6f3a54..6741a36 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -228,5 +228,4 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase {
* @return time when the current dag started executing
*/
long getDagStartTime();
-
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
index 9f9150f..e73ccf0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.junit.Assert;
import org.junit.Test;
public class TestTaskCommunicatorContextImpl {
@@ -83,6 +85,30 @@ public class TestTaskCommunicatorContextImpl {
taskCommContext1.containerAlive(containerId01);
verify(tal, never()).containerAlive(containerId01);
reset(tal);
+ }
+
+ @Test
+ public void testTaskCommContextReachesDAGConf() {
+ Configuration conf = new Configuration();
+ conf.set("dagkey", "dagvalue");
+
+ DAG dag = mock(DAG.class);
+ when(dag.getConf()).thenReturn(conf);
+
+ // TaskCommunicatorContextImpl.dag is present
+ TaskCommunicatorContextImpl commContext = new TaskCommunicatorContextImpl(null, null, null, 0);
+ commContext.dag = dag;
+
+ Assert.assertEquals("DAG config should be exposed via context.dag.getConf()",
+ commContext.getCurrentDagInfo().getConf().get("dagkey"), "dagvalue");
+
+ // TaskCommunicatorContextImpl.appContext.getCurrentDAG() is present
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentDAG()).thenReturn(dag);
+ commContext = new TaskCommunicatorContextImpl(appContext, null, null, 0);
+ Assert.assertEquals(
+ "DAG config should be exposed via context.appContext.getCurrentDAG().getConf()",
+ commContext.getCurrentDagInfo().getConf().get("dagkey"), "dagvalue");
}
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
index ab446ac..26a1a0b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.helpers;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.tez.serviceplugins.api.DagInfo;
@@ -53,4 +54,9 @@ public class DagInfoImplForTest implements DagInfo {
public BitSet getVertexDescendants(int vertexIndex) {
return null;
}
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
}