You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/09/12 20:29:58 UTC
[1/2] tez git commit: TEZ-3404. Move blocking call for YARN Timeline
domain creation from client side to AM. (Harish Jaiprakash via hitesh)
Repository: tez
Updated Branches:
refs/heads/master c07ec7b6f -> a23de4982
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
index f0ec1eb..d28ffe0 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
@@ -159,7 +158,6 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
if (domainId != null) {
// do nothing
LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
- return null;
} else {
if (!autoCreateDomain) {
// Error - Cannot fallback to default as that leaves ACLs open
@@ -169,18 +167,13 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
domainId = DOMAIN_ID_PREFIX + applicationId.toString();
createTimelineDomain(applicationId, domainId, tezConf, dagAccessControls);
LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId);
- return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
}
+ return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
}
private Map<String, String> createDAGDomain(Configuration tezConf,
ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
throws IOException, HistoryACLPolicyException {
- if (dagAccessControls == null) {
- // No DAG specific ACLs
- return null;
- }
-
String domainId =
tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -198,7 +191,6 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
if (domainId != null) {
// do nothing
LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
- return null;
} else {
if (!autoCreateDomain) {
// Error - Cannot fallback to default as that leaves ACLs open
@@ -206,14 +198,17 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
+ " Domains is disabled");
}
+ // Create a domain only if dagAccessControls has been specified.
+ if (dagAccessControls == null) {
+ return null;
+ }
domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
createTimelineDomain(applicationId, domainId, tezConf, dagAccessControls);
LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
- return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
}
+ return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
}
-
@Override
public void setConf(Configuration conf) {
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index dd21d2d..a095cbc 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.history.logging.ats;
+import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -36,11 +37,16 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyException;
import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -80,7 +86,9 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
ATSV15HistoryLoggingService.class.getName();
private static final String atsHistoryACLManagerClassName =
"org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
- private HistoryACLPolicyManager historyACLPolicyManager;
+
+ @VisibleForTesting
+ HistoryACLPolicyManager historyACLPolicyManager;
private int numDagsPerGroup;
@@ -133,7 +141,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
if (maxTimeToWaitOnShutdown < 0) {
waitForeverOnShutdown = true;
}
- sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
LOG.info("Initializing " + ATSV15HistoryLoggingService.class.getSimpleName() + " with "
+ ", maxPollingTime(ms)=" + maxPollingTimeMillis
@@ -165,6 +172,15 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
}
timelineClient.start();
+ // create a session domain id, if it fails then disable history logging.
+ try {
+ sessionDomainId = createSessionDomain();
+ } catch (HistoryACLPolicyException | IOException e) {
+ LOG.warn("Could not setup history acls, disabling history logging.", e);
+ historyLoggingEnabled = false;
+ return;
+ }
+
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -216,9 +232,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
@Override
public void serviceStop() {
- if (!historyLoggingEnabled || timelineClient == null) {
- return;
- }
LOG.info("Stopping ATSService"
+ ", eventQueueBacklog=" + eventQueue.size());
stopped.set(true);
@@ -265,11 +278,12 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+ ", eventQueueBacklog=" + eventQueue.size());
}
- timelineClient.stop();
+ if (timelineClient != null) {
+ timelineClient.stop();
+ }
if (historyACLPolicyManager != null) {
historyACLPolicyManager.close();
}
-
}
@VisibleForTesting
@@ -331,13 +345,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
skippedDAGs.add(dagId);
return false;
}
- if (historyACLPolicyManager != null) {
- String dagDomainId = dagSubmittedEvent.getConf().get(
- TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
- if (dagDomainId != null) {
- dagDomainIdMap.put(dagId, dagDomainId);
- }
- }
}
if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent();
@@ -363,28 +370,17 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
}
private void handleEvents(DAGHistoryEvent event) {
- String domainId = sessionDomainId;
- TezDAGID dagId = event.getDagID();
-
- if (historyACLPolicyManager != null && dagId != null) {
- if (dagDomainIdMap.containsKey(dagId)) {
- domainId = dagDomainIdMap.get(dagId);
- }
+ String domainId = getDomainForEvent(event);
+ // skippedDags is updated in the above call so check again.
+ if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
+ return;
}
- TimelineEntity entity =
- HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+ TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+ event.getHistoryEvent());
- if (historyACLPolicyManager != null) {
- if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) {
- if (domainId != null && !domainId.isEmpty()) {
- historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
- }
- } else {
- if (sessionDomainId != null && !sessionDomainId.isEmpty()) {
- historyACLPolicyManager.updateTimelineEntityDomain(entity, sessionDomainId);
- }
- }
+ if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
+ historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
}
try {
@@ -408,7 +404,99 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
} catch (Exception e) {
LOG.warn("Could not handle history events", e);
}
+ }
+ private String getDomainForEvent(DAGHistoryEvent event) {
+ String domainId = sessionDomainId;
+ if (historyACLPolicyManager == null) {
+ return domainId;
+ }
+
+ TezDAGID dagId = event.getDagID();
+ HistoryEvent historyEvent = event.getHistoryEvent();
+ if (dagId == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
+ return domainId;
+ }
+
+ if (dagDomainIdMap.containsKey(dagId)) {
+ // If we already have the domain for the dag id return it
+ domainId = dagDomainIdMap.get(dagId);
+ // Cleanup if this is the last event.
+ if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+ dagDomainIdMap.remove(dagId);
+ }
+ } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()
+ || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
+ // In case this is the first event for the dag, create and populate dag domain.
+ Configuration conf;
+ DAGPlan dagPlan;
+ if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
+ conf = ((DAGSubmittedEvent)historyEvent).getConf();
+ dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan();
+ } else {
+ conf = appContext.getCurrentDAG().getConf();
+ dagPlan = appContext.getCurrentDAG().getJobPlan();
+ }
+ domainId = createDagDomain(conf, dagPlan, dagId);
+
+ // createDagDomain updates skippedDAGs so another check here.
+ if (skippedDAGs.contains(dagId)) {
+ return null;
+ }
+
+ dagDomainIdMap.put(dagId, domainId);
+ }
+ return domainId;
+ }
+
+ /**
+ * Creates a domain for the session.
+ * @return domainId to be used. null if acls are disabled.
+ * @throws HistoryACLPolicyException, IOException Forward if historyACLPolicyManger exception.
+ */
+ private String createSessionDomain() throws IOException, HistoryACLPolicyException {
+ if (historyACLPolicyManager == null) {
+ return null;
+ }
+ Map<String, String> domainInfo = historyACLPolicyManager.setupSessionACLs(getConfig(),
+ appContext.getApplicationID());
+ if (domainInfo != null) {
+ return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+ }
+ return null;
}
+ /**
+ * When running in session mode, create a domain for the dag and return it.
+ * @param dagConf The configuration the dag for which domain has to be created.
+ * @param dagPlan The dag plan which contains the ACLs.
+ * @param dagId The dagId for which domain has to be created.
+ * @return The created domain id on success.
+ * sessionDomainId: If there is a failure also disable history logging for this dag.
+ * sessionDomainId: If historyACLPolicyManager returns null.
+ */
+ private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, TezDAGID dagId) {
+ // In non session mode dag domain is same as session domain id.
+ if (!appContext.isSession()) {
+ return sessionDomainId;
+ }
+ DAGAccessControls dagAccessControls = dagPlan.hasAclInfo()
+ ? DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo())
+ : null;
+ try {
+ Map<String, String> domainInfo = historyACLPolicyManager.setupSessionDAGACLs(
+ dagConf, appContext.getApplicationID(), Integer.toString(dagId.getId()),
+ dagAccessControls);
+ if (domainInfo != null) {
+ return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+ }
+ // Fallback to session domain, if domainInfo was null
+ return sessionDomainId;
+ } catch (IOException | HistoryACLPolicyException e) {
+ LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e);
+ skippedDAGs.add(dagId);
+ // Return value is not used, check for skippedDAG is important.
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
index 6f653ad..a690a19 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -300,6 +301,7 @@ public class TestATSHistoryV15 {
ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
AppContext appContext = mock(AppContext.class);
when(appContext.getApplicationID()).thenReturn(appId);
+ when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
service.setAppContext(appContext);
TimelineEntityGroupId grpId = service.getGroupId(event);
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index 1869b56..9111195 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -22,11 +22,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,6 +51,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
@@ -66,10 +75,14 @@ public class TestATSV15HistoryLoggingService {
private static String user = "TEST_USER";
private InMemoryTimelineClient timelineClient;
+ private AppContext appContext;
@Test(timeout=2000)
public void testDAGGroupingDefault() throws Exception {
ATSV15HistoryLoggingService service = createService(-1);
+
+ service.start();
+
TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
service.handle(event);
@@ -96,6 +109,8 @@ public class TestATSV15HistoryLoggingService {
@Test(timeout=2000)
public void testDAGGroupingDisabled() throws Exception {
ATSV15HistoryLoggingService service = createService(1);
+ service.start();
+
TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
service.handle(event);
@@ -123,6 +138,8 @@ public class TestATSV15HistoryLoggingService {
public void testDAGGroupingGroupingEnabled() throws Exception {
int numDagsPerGroup = 100;
ATSV15HistoryLoggingService service = createService(numDagsPerGroup);
+ service.start();
+
TezDAGID dagId1 = TezDAGID.getInstance(appId, 1);
for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
service.handle(event);
@@ -172,14 +189,243 @@ public class TestATSV15HistoryLoggingService {
service.stop();
}
+ @Test
+ public void testNonSessionDomains() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+
+ HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+ service.historyACLPolicyManager = historyACLPolicyManager;
+
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+ .thenReturn(Collections.singletonMap(
+ TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+ service.start();
+
+ verify(historyACLPolicyManager, times(1))
+ .setupSessionACLs((Configuration)any(), eq(appId));
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+ assertTrue(timelineClient.entityLog.size() > 0);
+
+ service.stop();
+ }
+
+ @Test
+ public void testNonSessionDomainsFailed() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+
+ HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+ service.historyACLPolicyManager = historyACLPolicyManager;
+
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+ .thenThrow(new IOException());
+
+ service.start();
+
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // History logging is disabled.
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+ assertEquals(0, timelineClient.entityLog.size());
+
+ service.stop();
+ }
+
+ @Test
+ public void testNonSessionDomainsAclNull() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+
+ HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+ service.historyACLPolicyManager = historyACLPolicyManager;
+
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+ .thenReturn(null);
+
+ service.start();
+
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // No domain updates but history logging happened.
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+ assertTrue(timelineClient.entityLog.size() > 0);
+
+ service.stop();
+ }
+
+ @Test
+ public void testSessionDomains() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+
+ when(appContext.isSession()).thenReturn(true);
+
+ HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+ service.historyACLPolicyManager = historyACLPolicyManager;
+
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+ .thenReturn(Collections.singletonMap(
+ TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+ service.start();
+
+ // Verify that the session domain was created.
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+ // Mock dag domain creation.
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-id"));
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // Verify dag domain was created.
+ verify(historyACLPolicyManager, times(1))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // calls were made with correct domain ids.
+ verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
+ verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-id"));
+
+ service.stop();
+ }
+
+ @Test
+ public void testSessionDomainsFailed() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+
+ when(appContext.isSession()).thenReturn(true);
+
+ HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+ service.historyACLPolicyManager = historyACLPolicyManager;
+
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+ .thenThrow(new IOException());
+
+ service.start();
+
+ // Verify that the session domain was created.
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+ // Mock dag domain creation.
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-id"));
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag creation was done.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // No history logging calls were done
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+ assertEquals(0, timelineClient.entityLog.size());
+
+ service.stop();
+ }
+
+ @Test
+ public void testSessionDomainsDagFailed() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+
+ when(appContext.isSession()).thenReturn(true);
+
+ HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+ service.historyACLPolicyManager = historyACLPolicyManager;
+
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+ service.start();
+
+ // Verify that the session domain creation was called.
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+ // Mock dag domain creation.
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+ .thenThrow(new IOException());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // Verify dag domain creation was called.
+ verify(historyACLPolicyManager, times(1))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // AM events sent, dag events are not sent.
+ verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("dag-id"));
+ assertEquals(1, timelineClient.entityLog.size());
+
+ service.stop();
+ }
+
private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
- AppContext appContext = mock(AppContext.class);
+ appContext = mock(AppContext.class);
when(appContext.getApplicationID()).thenReturn(appId);
when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
service.setAppContext(appContext);
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(false);
if (numDagsPerGroup != -1) {
conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
numDagsPerGroup);
@@ -191,7 +437,6 @@ public class TestATSV15HistoryLoggingService {
timelineClient.init(conf);
service.timelineClient = timelineClient;
- service.start();
return service;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 6b8d6e5..dc215fd 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.history.logging.ats;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -36,13 +38,17 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyException;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezReflectionException;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -54,8 +60,8 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
private static final Logger LOG = LoggerFactory.getLogger(ATSHistoryLoggingService.class);
- private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
- new LinkedBlockingQueue<DAGHistoryEvent>();
+ @VisibleForTesting
+ LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue<DAGHistoryEvent>();
private Thread eventHandlingThread;
private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -80,7 +86,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
"org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
private static final String atsHistoryACLManagerClassName =
"org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
- private HistoryACLPolicyManager historyACLPolicyManager;
+
+ @VisibleForTesting
+ HistoryACLPolicyManager historyACLPolicyManager;
public ATSHistoryLoggingService() {
super(ATSHistoryLoggingService.class.getName());
@@ -121,7 +129,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (maxTimeToWaitOnShutdown < 0) {
waitForeverOnShutdown = true;
}
- sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
+ "maxEventsPerBatch=" + maxEventsPerBatch
@@ -150,8 +157,17 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (!historyLoggingEnabled || timelineClient == null) {
return;
}
+
timelineClient.start();
+ try {
+ sessionDomainId = createSessionDomain();
+ } catch (HistoryACLPolicyException | IOException e) {
+ LOG.warn("Could not setup history acls, disabling history logging.", e);
+ historyLoggingEnabled = false;
+ return;
+ }
+
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -200,9 +216,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
@Override
public void serviceStop() {
- if (!historyLoggingEnabled || timelineClient == null) {
- return;
- }
LOG.info("Stopping ATSService"
+ ", eventQueueBacklog=" + eventQueue.size());
stopped.set(true);
@@ -242,7 +255,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+ ", eventQueueBacklog=" + eventQueue.size());
}
- timelineClient.stop();
+ if (timelineClient != null) {
+ timelineClient.stop();
+ }
if (historyACLPolicyManager != null) {
historyACLPolicyManager.close();
}
@@ -289,13 +304,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
skippedDAGs.add(dagId);
return false;
}
- if (historyACLPolicyManager != null) {
- String dagDomainId = dagSubmittedEvent.getConf().get(
- TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
- if (dagDomainId != null) {
- dagDomainIdMap.put(dagId, dagDomainId);
- }
- }
}
if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent();
@@ -321,33 +329,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
}
private void handleEvents(List<DAGHistoryEvent> events) {
- TimelineEntity[] entities = new TimelineEntity[events.size()];
- for (int i = 0; i < events.size(); ++i) {
- DAGHistoryEvent event = events.get(i);
- String domainId = sessionDomainId;
-
- TezDAGID dagId = event.getDagID();
-
- if (historyACLPolicyManager != null && dagId != null) {
- if (dagDomainIdMap.containsKey(dagId)) {
- domainId = dagDomainIdMap.get(dagId);
- }
+ List<TimelineEntity> entities = new ArrayList<>(events.size());
+ for (DAGHistoryEvent event : events) {
+ String domainId = getDomainForEvent(event);
+ // skippedDags is updated in the above call so check again.
+ if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
+ continue;
}
-
- entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
-
- if (historyACLPolicyManager != null) {
- if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) {
- if (domainId != null && !domainId.isEmpty()) {
- historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
- }
- } else {
- if (sessionDomainId != null && !sessionDomainId.isEmpty()) {
- historyACLPolicyManager.updateTimelineEntityDomain(entities[i], sessionDomainId);
- }
- }
+ TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+ event.getHistoryEvent());
+ entities.add(entity);
+ if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
+ historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
}
-
}
if (LOG.isDebugEnabled()) {
@@ -355,7 +349,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
}
try {
TimelinePutResponse response =
- timelineClient.putEntities(entities);
+ timelineClient.putEntities(entities.toArray(new TimelineEntity[entities.size()]));
if (response != null
&& !response.getErrors().isEmpty()) {
int count = response.getErrors().size();
@@ -375,4 +369,97 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
}
}
+ private String getDomainForEvent(DAGHistoryEvent event) {
+ String domainId = sessionDomainId;
+ if (historyACLPolicyManager == null) {
+ return domainId;
+ }
+
+ TezDAGID dagId = event.getDagID();
+ HistoryEvent historyEvent = event.getHistoryEvent();
+ if (dagId == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
+ return domainId;
+ }
+
+ if (dagDomainIdMap.containsKey(dagId)) {
+ // If we already have the domain for the dag id return it
+ domainId = dagDomainIdMap.get(dagId);
+ // Cleanup if this is the last event.
+ if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+ dagDomainIdMap.remove(dagId);
+ }
+ } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()
+ || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
+ // In case this is the first event for the dag, create and populate dag domain.
+ Configuration conf;
+ DAGPlan dagPlan;
+ if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
+ conf = ((DAGSubmittedEvent)historyEvent).getConf();
+ dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan();
+ } else {
+ conf = appContext.getCurrentDAG().getConf();
+ dagPlan = appContext.getCurrentDAG().getJobPlan();
+ }
+ domainId = createDagDomain(conf, dagPlan, dagId);
+
+ // createDagDomain updates skippedDAGs so another check here.
+ if (skippedDAGs.contains(dagId)) {
+ return null;
+ }
+
+ dagDomainIdMap.put(dagId, domainId);
+ }
+ return domainId;
+ }
+
+ /**
+ * Creates a domain for the session.
+ * @return domainId to be used. null if acls are disabled.
+ * @throws HistoryACLPolicyException, IOException Forward if historyACLPolicyManger exception.
+ */
+ private String createSessionDomain() throws HistoryACLPolicyException, IOException {
+ if (historyACLPolicyManager == null) {
+ return null;
+ }
+ Map<String, String> domainInfo = historyACLPolicyManager.setupSessionACLs(getConfig(),
+ appContext.getApplicationID());
+ if (domainInfo != null) {
+ return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+ }
+ return null;
+ }
+
+ /**
+ * When running in session mode, create a domain for the dag and return it.
+ * @param dagConf The configuration the dag for which domain has to be created.
+ * @param dagPlan The dag plan which contains the ACLs.
+ * @param dagId The dagId for which domain has to be created.
+ * @return The created domain id on success.
+ * sessionDomainId: If there is a failure also disable history logging for this dag.
+ * sessionDomainId: If historyACLPolicyManager returns null.
+ */
+ private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, TezDAGID dagId) {
+ // In non session mode dag domain is same as session domain id.
+ if (!appContext.isSession()) {
+ return sessionDomainId;
+ }
+ DAGAccessControls dagAccessControls = dagPlan.hasAclInfo()
+ ? DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo())
+ : null;
+ try {
+ Map<String, String> domainInfo = historyACLPolicyManager.setupSessionDAGACLs(
+ dagConf, appContext.getApplicationID(), Integer.toString(dagId.getId()),
+ dagAccessControls);
+ if (domainInfo != null) {
+ return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+ }
+ // Fallback to session domain, if domainInfo was null
+ return sessionDomainId;
+ } catch (IOException | HistoryACLPolicyException e) {
+ LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e);
+ skippedDAGs.add(dagId);
+ // Return value is not used, check for skippedDAG is important.
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index 464864e..da57eb2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -21,16 +21,30 @@ package org.apache.tez.dag.history.logging.ats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -39,9 +53,19 @@ import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
public class TestATSHistoryLoggingService {
private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryLoggingService.class);
@@ -51,11 +75,15 @@ public class TestATSHistoryLoggingService {
private Configuration conf;
private int atsInvokeCounter;
private int atsEntitiesCounter;
+ private HistoryACLPolicyManager historyACLPolicyManager;
private SystemClock clock = new SystemClock();
+ private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
@Before
public void setup() throws Exception {
appContext = mock(AppContext.class);
+ historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
atsHistoryLoggingService = new ATSHistoryLoggingService();
atsHistoryLoggingService.setAppContext(appContext);
conf = new Configuration(false);
@@ -63,13 +91,16 @@ public class TestATSHistoryLoggingService {
1000l);
conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ conf.set(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain");
atsInvokeCounter = 0;
atsEntitiesCounter = 0;
atsHistoryLoggingService.init(conf);
+ atsHistoryLoggingService.historyACLPolicyManager = historyACLPolicyManager;
atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
- atsHistoryLoggingService.start();
+
when(appContext.getClock()).thenReturn(clock);
when(appContext.getCurrentDAGID()).thenReturn(null);
+ when(appContext.getApplicationID()).thenReturn(appId);
when(atsHistoryLoggingService.timelineClient.putEntities(
Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
new Answer<Object>() {
@@ -96,6 +127,7 @@ public class TestATSHistoryLoggingService {
@Test(timeout=20000)
public void testATSHistoryLoggingServiceShutdown() {
+ atsHistoryLoggingService.start();
TezDAGID tezDAGID = TezDAGID.getInstance(
ApplicationId.newInstance(100l, 1), 1);
DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
@@ -122,6 +154,7 @@ public class TestATSHistoryLoggingService {
@Test(timeout=20000)
public void testATSEventBatching() {
+ atsHistoryLoggingService.start();
TezDAGID tezDAGID = TezDAGID.getInstance(
ApplicationId.newInstance(100l, 1), 1);
DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
@@ -145,9 +178,8 @@ public class TestATSHistoryLoggingService {
@Test(timeout=20000)
public void testTimelineServiceDisable() throws Exception {
+ atsHistoryLoggingService.start();
ATSHistoryLoggingService atsHistoryLoggingService1;
- AppContext appContext1;
- appContext1 = mock(AppContext.class);
atsHistoryLoggingService1 = new ATSHistoryLoggingService();
atsHistoryLoggingService1.setAppContext(appContext);
@@ -160,7 +192,7 @@ public class TestATSHistoryLoggingService {
++atsInvokeCounter;
atsEntitiesCounter += invocation.getArguments().length;
try {
- Thread.sleep(500l);
+ Thread.sleep(10l);
} catch (InterruptedException e) {
// do nothing
}
@@ -181,7 +213,7 @@ public class TestATSHistoryLoggingService {
}
try {
- Thread.sleep(1000l);
+ Thread.sleep(20l);
} catch (InterruptedException e) {
// Do nothing
}
@@ -190,6 +222,238 @@ public class TestATSHistoryLoggingService {
Assert.assertEquals(atsInvokeCounter, 0);
Assert.assertEquals(atsEntitiesCounter, 0);
Assert.assertNull(atsHistoryLoggingService1.timelineClient);
-
+ atsHistoryLoggingService1.close();
+ }
+
+ @Test(timeout=10000)
+ public void testNonSessionDomains() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+ (Configuration)any(), (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ Thread.sleep(2500);
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+ }
+
+ @Test(timeout=10000)
+ public void testNonSessionDomainsFailed() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenThrow(new IOException());
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+ (Configuration)any(), (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(1000);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
+ Assert.assertEquals(0, atsEntitiesCounter);
+ }
+
+ @Test(timeout=10000)
+ public void testNonSessionDomainsAclNull() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenReturn(null);
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+ (Configuration)any(), (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ Thread.sleep(2500);
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
+ Assert.assertEquals(5, atsEntitiesCounter);
+ }
+
+ @Test(timeout=10000)
+ public void testSessionDomains() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain"));
+
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-domain"));
+
+ when(appContext.isSession()).thenReturn(true);
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+ (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ Thread.sleep(2500);
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(1))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("test-domain"));
+ verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-domain"));
+ }
+
+ @Test(timeout=10000)
+ public void testSessionDomainsFailed() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenThrow(new IOException());
+
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+ .thenReturn(
+ Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-domain"));
+
+ when(appContext.isSession()).thenReturn(true);
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+ (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(1000);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(0))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // No calls were made for domains.
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+ Assert.assertEquals(0, atsEntitiesCounter);
+ }
+
+ @Test(timeout=10000)
+ public void testSessionDomainsDagFailed() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenReturn(Collections.singletonMap(
+ TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-domain"));
+
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+ .thenThrow(new IOException());
+
+ when(appContext.isSession()).thenReturn(true);
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+ (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ Thread.sleep(2500);
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // DAG domain was called once.
+ verify(historyACLPolicyManager, times(1))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(1))
+ .updateTimelineEntityDomain(any(), eq("session-domain"));
+ verify(historyACLPolicyManager, times(1))
+ .updateTimelineEntityDomain(any(), (String)any());
+ Assert.assertEquals(1, atsEntitiesCounter);
+ }
+
+ @Test(timeout=10000)
+ public void testSessionDomainsAclNull() throws Exception {
+ when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+ .thenReturn(null);
+
+ when(historyACLPolicyManager.setupSessionDAGACLs(
+ (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+ .thenReturn(null);
+
+ when(appContext.isSession()).thenReturn(true);
+ atsHistoryLoggingService.start();
+ verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+ (ApplicationId)any());
+
+ // Send the event and wait for completion.
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+ atsHistoryLoggingService.handle(event);
+ }
+ Thread.sleep(2500);
+ while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ // No dag domain were created.
+ verify(historyACLPolicyManager, times(1))
+ .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+ // All calls made with session domain id.
+ verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+ Assert.assertEquals(5, atsEntitiesCounter);
+ }
+
+ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+ ATSHistoryLoggingService service) {
+ List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+ long time = System.currentTimeMillis();
+ Configuration conf = new Configuration(service.getConfig());
+ historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user")));
+ historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
+ DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null)));
+ TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
+ TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+ historyEvents
+ .add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, "test", time, time)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
+ ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null,
+ null, null)));
+ return historyEvents;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b0bacf4..3be7131 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -663,7 +663,8 @@ public class MRRSleepJob extends Configured implements Tool {
" [-irt intermediateReduceSleepTime]" +
" [-recordt recordSleepTime (msec)]" +
" [-generateSplitsInAM (false)/true]" +
- " [-writeSplitsToDfs (false)/true]");
+ " [-writeSplitsToDfs (false)/true]" +
+ " [-numDags numDagsToSubmit");
ToolRunner.printGenericCommandUsage(System.err);
return 2;
}
@@ -676,6 +677,8 @@ public class MRRSleepJob extends Configured implements Tool {
boolean writeSplitsToDfs = false;
boolean generateSplitsInAM = false;
boolean splitsOptionFound = false;
+ boolean isSession = false;
+ int numDags = 1;
for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) {
@@ -716,6 +719,12 @@ public class MRRSleepJob extends Configured implements Tool {
}
splitsOptionFound = true;
writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
+ } else if (args[i].equals("-numDags")) {
+ numDags = Integer.parseInt(args[++i]);
+ if (numDags < 1) {
+ throw new RuntimeException("numDags should be positive");
+ }
+ isSession = numDags > 1;
}
}
@@ -747,13 +756,20 @@ public class MRRSleepJob extends Configured implements Tool {
mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
- TezClient tezSession = TezClient.create("MRRSleep", conf, false, null, credentials);
+ TezClient tezSession = TezClient.create("MRRSleep", conf, isSession, null, credentials);
tezSession.start();
- DAGClient dagClient = tezSession.submitDAG(dag);
- dagClient.waitForCompletion();
- tezSession.stop();
-
- return dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED) ? 0 : 1;
+ try {
+ for (; numDags > 0; --numDags) {
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ dagClient.waitForCompletion();
+ if (!dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED)) {
+ return 1;
+ }
+ }
+ } finally {
+ tezSession.stop();
+ }
+ return 0;
}
}
[2/2] tez git commit: TEZ-3404. Move blocking call for YARN Timeline
domain creation from client side to AM. (Harish Jaiprakash via hitesh)
Posted by hi...@apache.org.
TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. (Harish Jaiprakash via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a23de498
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a23de498
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a23de498
Branch: refs/heads/master
Commit: a23de4982e4ed0d55fb711745e7670b3be4b266e
Parents: c07ec7b
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Sep 12 13:29:22 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Sep 12 13:29:22 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 89 +-----
.../org/apache/tez/client/TezClientUtils.java | 69 +----
.../apache/tez/common/security/ACLManager.java | 27 +-
.../tez/common/security/DAGAccessControls.java | 43 ++-
.../security/HistoryACLPolicyManager.java | 30 +-
.../main/java/org/apache/tez/dag/api/DAG.java | 32 +--
.../apache/tez/dag/api/DagTypeConverters.java | 25 ++
tez-api/src/main/proto/DAGApiRecords.proto | 8 +
.../org/apache/tez/client/TestTezClient.java | 6 +-
.../apache/tez/client/TestTezClientUtils.java | 18 +-
.../tez/common/security/TestACLManager.java | 24 +-
.../common/security/TestDAGAccessControls.java | 167 ++++++-----
.../org/apache/tez/dag/api/TestDAGPlan.java | 25 +-
.../org/apache/tez/dag/api/TestDAGVerify.java | 24 +-
.../tez/dag/api/TestDagTypeConverters.java | 38 +++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +-
.../ats/acls/ATSHistoryACLPolicyManager.java | 16 +-
.../ats/acls/TestATSHistoryWithACLs.java | 202 +-------------
.../ats/acls/ATSV15HistoryACLPolicyManager.java | 17 +-
.../ats/ATSV15HistoryLoggingService.java | 154 ++++++++---
.../dag/history/ats/acls/TestATSHistoryV15.java | 2 +
.../ats/TestATSV15HistoryLoggingService.java | 251 ++++++++++++++++-
.../logging/ats/ATSHistoryLoggingService.java | 171 +++++++++---
.../ats/TestATSHistoryLoggingService.java | 276 ++++++++++++++++++-
.../tez/mapreduce/examples/MRRSleepJob.java | 30 +-
26 files changed, 1127 insertions(+), 620 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd6ab68..99bae83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list.
TEZ-3284. Synchronization for every write in UnorderdKVWriter
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 71ba6b2..780fcb7 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.util.Time;
import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -69,7 +68,6 @@ import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
@@ -79,7 +77,6 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.common.security.HistoryACLPolicyException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -136,11 +133,9 @@ public class TezClient {
final TezApiVersionInfo apiVersionInfo;
@VisibleForTesting
final ServicePluginsDescriptor servicePluginsDescriptor;
- private HistoryACLPolicyManager historyACLPolicyManager;
private JavaOptsChecker javaOptsChecker = null;
private int preWarmDAGCounter = 0;
- private int dagCounter = 0;
/* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */
private int maxSubmitDAGRequestSizeThroughIPC;
@@ -148,15 +143,6 @@ public class TezClient {
private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0);
private FileSystem stagingFs = null;
- private static final String atsHistoryLoggingServiceClassName =
- "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
- private static final String atsHistoryACLManagerClassName =
- "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
- private static final String atsv15HistoryLoggingServiceClassName =
- "org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService";
- private static final String atsV15HistoryACLManagerClassName =
- "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
-
private TezClient(String name, TezConfiguration tezConf) {
this(name, tezConf, tezConf.getBoolean(
TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
@@ -375,12 +361,6 @@ public class TezClient {
historyLogLevel);
}
- @Private
- @VisibleForTesting
- public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager) {
- historyACLPolicyManager = myAclPolicyManager;
- }
-
/**
* Start the client. This establishes a connection to the YARN cluster.
* In session mode, this start the App Master thats runs all the DAGs in the
@@ -395,40 +375,6 @@ public class TezClient {
frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration());
frameworkClient.start();
- ///need additional check for historyACLPolicyManager because tests could stub historyACLPolicyManager
- ///before tezclient start. If there is already a stubbed historyACLPolicyManager, we don't overwrite it
- if (historyACLPolicyManager == null) {
- //TODO: FIXME: The ACL manager should be retrieved either from the
- //logging service directly or via a pluggable factory that can
- //instantiate ACL managers and logging services
- String logSvcClassName = amConfig.getTezConfiguration().get(
- TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
- String aclMgrClassName = null;
- if (logSvcClassName.equals(atsHistoryLoggingServiceClassName)) {
- aclMgrClassName = atsHistoryACLManagerClassName;
- } else if (logSvcClassName.equals(
- atsv15HistoryLoggingServiceClassName)) {
- aclMgrClassName = atsV15HistoryACLManagerClassName;
- }
- if (aclMgrClassName != null) {
- LOG.info("Using " + aclMgrClassName + " to manage Timeline ACLs");
- try {
- historyACLPolicyManager = ReflectionUtils.createClazzInstance(
- aclMgrClassName);
- historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
- } catch (TezReflectionException e) {
- if (!amConfig.getTezConfiguration().getBoolean(
- TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
- TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
- LOG.warn("Could not instantiate object for " + aclMgrClassName
- + ". ACLs cannot be enforced correctly for history data in Timeline", e);
- throw e;
- }
- historyACLPolicyManager = null;
- }
- }
- }
-
if (this.amConfig.getTezConfiguration().getBoolean(
TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED,
TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) {
@@ -476,7 +422,7 @@ public class TezClient {
sessionAppId,
null, clientName, amConfig,
tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
- historyACLPolicyManager, servicePluginsDescriptor, javaOptsChecker);
+ servicePluginsDescriptor, javaOptsChecker);
// Set Tez Sessions to not retry on AM crashes if recovery is disabled
if (!amConfig.getTezConfiguration().getBoolean(
@@ -511,7 +457,6 @@ public class TezClient {
* if submission timed out
*/
public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
- ++dagCounter;
if (isSession) {
return submitDAGSession(dag);
} else {
@@ -545,32 +490,9 @@ public class TezClient {
}
TezConfiguration dagClientConf = new TezConfiguration(amConfig.getTezConfiguration());
- Map<String, String> aclConfigs = null;
- // TEZ_AM_HISTORY_LOGGING_ENABLED is a config setting enable/disable logging of all
- // dags within a session
- boolean sessionHistoryLoggingEnabled = amConfig.getTezConfiguration().getBoolean(
- TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
- TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
- if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) {
- try {
- aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
- amConfig.getTezConfiguration(), sessionAppId,
- Integer.toString(dagCounter), dag.getDagAccessControls());
- } catch (HistoryACLPolicyException e) {
- LOG.warn("Disabling history logging for dag " +
- dag.getName() + " due to error in setting up history acls " + e);
- dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
- dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, false);
- }
- } else if (!sessionHistoryLoggingEnabled) {
- dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
- dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, false);
- }
-
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
- usingTezArchiveDeploy, sessionCredentials, aclConfigs, servicePluginsDescriptor,
- javaOptsChecker);
+ usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
requestBuilder.setDAGPlan(dagPlan);
@@ -644,10 +566,6 @@ public class TezClient {
*/
public synchronized void stop() throws TezException, IOException {
try {
- if (historyACLPolicyManager != null) {
- historyACLPolicyManager.close();
- }
-
if (sessionStarted) {
LOG.info("Shutting down Tez Session"
+ ", sessionName=" + clientName
@@ -1032,8 +950,7 @@ public class TezClient {
ApplicationSubmissionContext appContext = TezClientUtils
.createApplicationSubmissionContext(
appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
- usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager,
- servicePluginsDescriptor, javaOptsChecker);
+ usingTezArchiveDeploy, apiVersionInfo, servicePluginsDescriptor, javaOptsChecker);
String callerContextStr = "";
if (dag.getCallerContext() != null) {
callerContextStr = ", callerContext=" + dag.getCallerContext().contextAsSimpleString();
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index eb1a95e..f440b6f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -86,8 +86,6 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezYARNUtils;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.security.ACLManager;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
-import org.apache.tez.common.security.HistoryACLPolicyException;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
@@ -445,7 +443,6 @@ public class TezClientUtils {
* @param amConfig AM Configuration
* @param tezJarResources Resources to be used by the AM
* @param sessionCreds the credential object which will be populated with session specific
- * @param historyACLPolicyManager
* @param servicePluginsDescriptor descriptor for services which may be running in the AM
* @return an ApplicationSubmissionContext to launch a Tez AM
* @throws IOException
@@ -457,7 +454,7 @@ public class TezClientUtils {
ApplicationId appId, DAG dag, String amName,
AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
Credentials sessionCreds, boolean tezLrsAsArchive,
- TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager,
+ TezApiVersionInfo apiVersionInfo,
ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
throws IOException, YarnException {
@@ -565,41 +562,19 @@ public class TezClientUtils {
}
amLocalResources.putAll(tezJarResources);
- // Setup Session ACLs and update conf as needed
- Map<String, String> aclConfigs = null;
- if (historyACLPolicyManager != null) {
- if (dag == null) {
- try{
- aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
- appId);
- } catch (HistoryACLPolicyException e) {
- LOG.warn("Disabling history logging for session " + strAppId +
- " due to error in setting up history acls " + e);
- amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
- false);
- }
- } else {
- try{
- // Non-session mode
- // As only a single DAG is support, we should combine AM and DAG ACLs under the same
- // acl management layer
- aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
- appId, dag.getDagAccessControls());
- } catch (HistoryACLPolicyException e) {
- LOG.warn("Disabling history logging for dag " +
- dag.getName() + " due to error in setting up history acls " + e);
- dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
- // This is non-session mode so disable logging for whole AM
- amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
- false);
- }
- }
+ TezConfiguration tezConf = amConfig.getTezConfiguration();
+ // Merge the dag access controls into tez am config.
+ if (dag != null && dag.getDagAccessControls() != null) {
+ // Merge updates the conf object passed. In non session mode, same client object can be used
+ // to submit multiple dags, copying this prevents ACL of one DAG from being used in another.
+ tezConf = new TezConfiguration(amConfig.getTezConfiguration());
+ dag.getDagAccessControls().mergeIntoAmAcls(tezConf);
}
// emit conf as PB file
- ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(),
- aclConfigs, servicePluginsDescriptor);
-
+ ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
+ servicePluginsDescriptor);
+
FSDataOutputStream amConfPBOutBinaryStream = null;
try {
amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath);
@@ -737,20 +712,10 @@ public class TezClientUtils {
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor,
JavaOptsChecker javaOptsChecker) throws IOException {
- return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials,
- null, servicePluginsDescriptor, javaOptsChecker);
- }
-
- static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
- Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
- Credentials credentials, Map<String, String> additionalDAGConfigs,
- ServicePluginsDescriptor servicePluginsDescriptor,
- JavaOptsChecker javaOptsChecker) throws IOException {
Credentials dagCredentials = setupDAGCredentials(dag, credentials,
amConfig.getTezConfiguration());
return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
- amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, servicePluginsDescriptor,
- javaOptsChecker);
+ amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker);
}
static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -829,7 +794,7 @@ public class TezClientUtils {
}
static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
- Map<String, String> additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor) {
+ ServicePluginsDescriptor servicePluginsDescriptor) {
assert amConf != null;
ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
for (Entry<String, String> entry : amConf) {
@@ -838,14 +803,6 @@ public class TezClientUtils {
kvp.setValue(amConf.get(entry.getKey()));
builder.addConfKeyValues(kvp);
}
- if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
- for (Entry<String, String> entry : additionalConfigs.entrySet()) {
- PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
- kvp.setKey(entry.getKey());
- kvp.setValue(entry.getValue());
- builder.addConfKeyValues(kvp);
- }
- }
AMPluginDescriptorProto pluginDescriptorProto =
DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
index e1c7314..08680a5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
@@ -26,15 +26,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
/**
* Class to manage ACLs for the Tez AM and DAGs and provides functionality to check whether
@@ -42,8 +42,6 @@ import com.google.common.annotations.VisibleForTesting;
*/
@Private
public class ACLManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(ACLManager.class);
public static final String WILDCARD_ACL_VALUE = "*";
private final String dagUser;
@@ -75,7 +73,7 @@ public class ACLManager {
}
}
- public ACLManager(ACLManager amACLManager, String dagUser, Configuration dagConf) {
+ public ACLManager(ACLManager amACLManager, String dagUser, ACLInfo aclInfo) {
this.amUser = amACLManager.amUser;
this.dagUser = dagUser;
this.users = amACLManager.users;
@@ -84,12 +82,21 @@ public class ACLManager {
if (!aclsEnabled) {
return;
}
- ACLConfigurationParser parser = new ACLConfigurationParser(dagConf, true);
- if (parser.getAllowedUsers() != null) {
- this.users.putAll(parser.getAllowedUsers());
+ if (aclInfo.getUsersWithViewAccessCount() > 0) {
+ this.users.put(ACLType.DAG_VIEW_ACL,
+ Sets.newLinkedHashSet(aclInfo.getUsersWithViewAccessList()));
}
- if (parser.getAllowedGroups() != null) {
- this.groups.putAll(parser.getAllowedGroups());
+ if (aclInfo.getUsersWithModifyAccessCount() > 0) {
+ this.users.put(ACLType.DAG_MODIFY_ACL,
+ Sets.newLinkedHashSet(aclInfo.getUsersWithModifyAccessList()));
+ }
+ if (aclInfo.getGroupsWithViewAccessCount() > 0) {
+ this.groups.put(ACLType.DAG_VIEW_ACL,
+ Sets.newLinkedHashSet(aclInfo.getGroupsWithViewAccessList()));
+ }
+ if (aclInfo.getGroupsWithModifyAccessCount() > 0) {
+ this.groups.put(ACLType.DAG_MODIFY_ACL,
+ Sets.newLinkedHashSet(aclInfo.getGroupsWithModifyAccessList()));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
index 5fe352a..520416d 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
@@ -27,8 +27,11 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
+import com.google.common.collect.ImmutableMap;
+
/**
* Access controls for the DAG
*/
@@ -150,23 +153,37 @@ public class DAGAccessControls {
return Collections.unmodifiableSet(groupsWithModifyACLs);
}
+ /**
+ * Merge the dag acls with the AM acls in the configuration object. The config object will contain
+ * the updated acls.
+ * @param conf The AM config.
+ */
@Private
- public synchronized void serializeToConfiguration(Configuration conf) {
- if (usersWithViewACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) {
- conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE);
+ public synchronized void mergeIntoAmAcls(Configuration conf) {
+ ACLConfigurationParser parser = new ACLConfigurationParser(conf, false);
+ parser.addAllowedGroups(ImmutableMap.of(
+ ACLType.AM_VIEW_ACL, groupsWithViewACLs, ACLType.AM_MODIFY_ACL, groupsWithModifyACLs));
+ parser.addAllowedUsers(ImmutableMap.of(
+ ACLType.AM_VIEW_ACL, usersWithViewACLs, ACLType.AM_MODIFY_ACL, usersWithModifyACLs));
+
+ Set<String> viewUsers = parser.getAllowedUsers().get(ACLType.AM_VIEW_ACL);
+ Set<String> viewGroups = parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL);
+ if (viewUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE);
} else {
- String userList = ACLManager.toCommaSeparatedString(usersWithViewACLs);
- String groupList = ACLManager.toCommaSeparatedString(groupsWithViewACLs);
- conf.set(TezConstants.TEZ_DAG_VIEW_ACLS,
- userList + " " + groupList);
+ String userList = ACLManager.toCommaSeparatedString(viewUsers);
+ String groupList = ACLManager.toCommaSeparatedString(viewGroups);
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, userList + " " + groupList);
}
- if (usersWithModifyACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) {
- conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, ACLManager.WILDCARD_ACL_VALUE);
+
+ Set<String> modifyUsers = parser.getAllowedUsers().get(ACLType.AM_MODIFY_ACL);
+ Set<String> modifyGroups = parser.getAllowedGroups().get(ACLType.AM_MODIFY_ACL);
+ if (modifyUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, ACLManager.WILDCARD_ACL_VALUE);
} else {
- String userList = ACLManager.toCommaSeparatedString(usersWithModifyACLs);
- String groupList = ACLManager.toCommaSeparatedString(groupsWithModifyACLs);
- conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, userList + " " + groupList);
+ String userList = ACLManager.toCommaSeparatedString(modifyUsers);
+ String groupList = ACLManager.toCommaSeparatedString(modifyGroups);
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, userList + " " + groupList);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
index fc0f57c..92eea67 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
@@ -30,15 +30,21 @@ import org.apache.tez.common.security.HistoryACLPolicyException;
/**
* ACL Policy Manager
- * An instance of this implements any ACL related activity when starting a session or
- * submitting a DAG
+ * An instance of this implements any ACL related activity when starting a session or submitting a
+ * DAG. It is used in the HistoryLoggingService to create domain ids and populate entities with
+ * domain id.
*/
@Unstable
@Private
public interface HistoryACLPolicyManager extends Configurable {
/**
- * Take any necessary steps for setting up Session ACLs
+ * Take any necessary steps for setting up both Session ACLs and non session acls. This is called
+ * with the am configuration which contains the ACL information to be used to create a domain.
+ * If the method returns a value, then its assumed to be a valid domain and used as domainId.
+ * If the method returns null, acls are disabled at session level, i.e use default acls at session
+ * level.
+ * If the method throws an Exception, history logging is disabled for the entire session.
* @param conf Configuration
* @param applicationId Application ID for the session
* @throws Exception
@@ -47,7 +53,7 @@ public interface HistoryACLPolicyManager extends Configurable {
throws IOException, HistoryACLPolicyException;
/**
- * Take any necessary steps for setting up ACLs for an AM which is running in non-session mode
+ * Not used currently.
* @param conf Configuration
* @param applicationId Application ID for the AM
* @param dagAccessControls ACLs defined for the DAG being submitted
@@ -57,16 +63,26 @@ public interface HistoryACLPolicyManager extends Configurable {
DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException;
/**
- * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session
+ * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session. This is
+ * called with dag configuration.
+ * If the method returns a value, then it is assumed to be valid domain and is used as a domainId
+ * for all of the dag events.
+ * If the method returns null, it falls back to session level acls.
+ * If the method throws Exception: it disables history logging for the dag events.
* @param conf Configuration
* @param applicationId Application ID for the AM
* @param dagAccessControls ACLs defined for the DAG being submitted
* @throws Exception
*/
public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId,
- String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException;
-
+ String dagName, DAGAccessControls dagAccessControls)
+ throws IOException, HistoryACLPolicyException;
+ /**
+ * Called with a timeline entity which has to be updated with a domain id.
+ * @param timelineEntity The timeline entity which will be published.
+ * @param domainId The domainId returned by one of the setup*ACL calls.
+ */
public void updateTimelineEntityDomain(Object timelineEntity, String domainId);
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 65321a8..f15c1fb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -770,15 +769,15 @@ public class DAG {
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
boolean tezLrsAsArchive) {
return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
- null, null, null);
+ null, null);
}
// create protobuf message describing DAG
@Private
public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
- boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
- ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) {
+ boolean tezLrsAsArchive, ServicePluginsDescriptor servicePluginsDescriptor,
+ JavaOptsChecker javaOptsChecker) {
Deque<String> topologicalVertexStack = verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -1017,30 +1016,11 @@ public class DAG {
dagBuilder.addEdge(edgeBuilder);
}
- ConfigurationProto.Builder confProtoBuilder =
- ConfigurationProto.newBuilder();
if (dagAccessControls != null) {
- Configuration aclConf = new Configuration(false);
- dagAccessControls.serializeToConfiguration(aclConf);
- Iterator<Entry<String, String>> aclConfIter = aclConf.iterator();
- while (aclConfIter.hasNext()) {
- Entry<String, String> entry = aclConfIter.next();
- PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
- kvp.setKey(entry.getKey());
- kvp.setValue(entry.getValue());
- TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
- confProtoBuilder.addConfKeyValues(kvp);
- }
- }
- if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
- for (Entry<String, String> entry : additionalConfigs.entrySet()) {
- PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
- kvp.setKey(entry.getKey());
- kvp.setValue(entry.getValue());
- TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
- confProtoBuilder.addConfKeyValues(kvp);
- }
+ dagBuilder.setAclInfo(DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls));
}
+
+ ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
if (!this.dagConf.isEmpty()) {
for (Entry<String, String> entry : this.dagConf.entrySet()) {
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 5733da8..cefe026 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -50,6 +50,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -57,6 +58,7 @@ import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -863,4 +865,27 @@ public class DagTypeConverters {
return callerContext;
}
+ public static ACLInfo convertDAGAccessControlsToProto(DAGAccessControls dagAccessControls) {
+ if (dagAccessControls == null) {
+ return null;
+ }
+ ACLInfo.Builder builder = ACLInfo.newBuilder();
+ builder.addAllUsersWithViewAccess(dagAccessControls.getUsersWithViewACLs());
+ builder.addAllUsersWithModifyAccess(dagAccessControls.getUsersWithModifyACLs());
+ builder.addAllGroupsWithViewAccess(dagAccessControls.getGroupsWithViewACLs());
+ builder.addAllGroupsWithModifyAccess(dagAccessControls.getGroupsWithModifyACLs());
+ return builder.build();
+ }
+
+ public static DAGAccessControls convertDAGAccessControlsFromProto(ACLInfo aclInfo) {
+ if (aclInfo == null) {
+ return null;
+ }
+ DAGAccessControls dagAccessControls = new DAGAccessControls();
+ dagAccessControls.setUsersWithViewACLs(aclInfo.getUsersWithViewAccessList());
+ dagAccessControls.setUsersWithModifyACLs(aclInfo.getUsersWithModifyAccessList());
+ dagAccessControls.setGroupsWithViewACLs(aclInfo.getGroupsWithViewAccessList());
+ dagAccessControls.setGroupsWithModifyACLs(aclInfo.getGroupsWithModifyAccessList());
+ return dagAccessControls;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index d016d60..c84094b 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -197,6 +197,13 @@ message CallerContextProto {
optional string blob = 4;
}
+message ACLInfo {
+ repeated string usersWithViewAccess = 1;
+ repeated string usersWithModifyAccess = 2;
+ repeated string groupsWithViewAccess = 3;
+ repeated string groupsWithModifyAccess = 4;
+}
+
message DAGPlan {
required string name = 1;
repeated VertexPlan vertex = 2;
@@ -208,6 +215,7 @@ message DAGPlan {
optional string dag_info = 8;
optional VertexExecutionContextProto default_execution_context = 9;
optional CallerContextProto caller_context = 10;
+ optional ACLInfo aclInfo = 11;
}
// DAG monitoring messages
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index d49ba48..51f36a3 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -241,12 +240,10 @@ public class TestTezClient {
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null);
- HistoryACLPolicyManager mockAcl = mock(HistoryACLPolicyManager.class);
ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
- client.setUpHistoryAclManager(mockAcl);
client.start();
verify(client.mockYarnClient, times(1)).init((Configuration)any());
verify(client.mockYarnClient, times(1)).start();
@@ -344,9 +341,8 @@ public class TestTezClient {
(ShutdownSessionRequestProto) any());
}
verify(client.mockYarnClient, times(1)).stop();
- verify(mockAcl, times(1)).close();
}
-
+
@Test (timeout=5000)
public void testPreWarm() throws Exception {
TezClientForTest client = configureAndCreateTezClient();
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 2c69d77..49aae20 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileNotFoundException;
@@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
@@ -133,7 +131,7 @@ public class TestTezClientUtils {
/**
*
*/
- @Test (timeout=5000)
+ @Test (timeout=10000)
public void validateSetTezJarLocalResourcesDefinedExistingDirectory() throws Exception {
URL[] cp = ((URLClassLoader)ClassLoader.getSystemClassLoader()).getURLs();
StringBuffer buffer = new StringBuffer();
@@ -314,7 +312,7 @@ public class TestTezClientUtils {
appId, null, "dagname",
amConf, m,
credentials, false,
- new TezApiVersionInfo(), null, null, null);
+ new TezApiVersionInfo(), null, null);
assertEquals(testpriority, appcontext.getPriority().getPriority());
}
@@ -366,7 +364,7 @@ public class TestTezClientUtils {
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
- mock(HistoryACLPolicyManager.class), null, null);
+ null, null);
ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
@@ -399,7 +397,7 @@ public class TestTezClientUtils {
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
- mock(HistoryACLPolicyManager.class), null, null);
+ null, null);
List<String> expectedCommands = new LinkedList<String>();
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -439,7 +437,7 @@ public class TestTezClientUtils {
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
- mock(HistoryACLPolicyManager.class), null, null);
+ null, null);
List<String> expectedCommands = new LinkedList<String>();
expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -626,7 +624,7 @@ public class TestTezClientUtils {
expected.put("property1", val1);
expected.put("property2", expVal2);
- ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, null);
+ ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null);
for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
String v = expected.remove(kvPair.getKey());
@@ -730,7 +728,7 @@ public class TestTezClientUtils {
srcConf.set(entry.getKey(), entry.getValue());
}
- ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null, null);
+ ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null);
for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
String val = confMap.remove(kvPair.getKey());
@@ -792,7 +790,7 @@ public class TestTezClientUtils {
Configuration conf = new Configuration(false);
ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
- ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null,
+ ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf,
servicePluginsDescriptor);
assertTrue(confProto.hasAmPluginDescriptor());
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
index a88e801..fc9c24e 100644
--- a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
+++ b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
@@ -20,18 +20,16 @@ package org.apache.tez.common.security;
import java.io.IOException;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.collect.Sets;
public class TestACLManager {
@@ -64,7 +62,8 @@ public class TestACLManager {
Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_VIEW_ACL));
Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_MODIFY_ACL));
- ACLManager dagAclManager = new ACLManager(aclManager, dagUser.getShortUserName(), new Configuration(false));
+ ACLManager dagAclManager = new ACLManager(aclManager, dagUser.getShortUserName(),
+ ACLInfo.getDefaultInstance());
user = dagUser;
Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_VIEW_ACL));
Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_MODIFY_ACL));
@@ -256,17 +255,20 @@ public class TestACLManager {
conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyACLs);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, yarnAdminACLs);
- // DAG View ACLs: user1, user4, grp3, grp4.
- String dagViewACLs = "user6, grp5 ";
- // DAG Modify ACLs: user3, grp6, grp7
- String dagModifyACLs = "user6,user5 ";
- conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, dagViewACLs);
- conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, dagModifyACLs);
+ ACLInfo.Builder builder = ACLInfo.newBuilder();
+ // DAG View ACLs: user6, grp5
+ builder.addUsersWithViewAccess("user6");
+ builder.addGroupsWithViewAccess("grp5");
+
+ // DAG Modify ACLs: user6,user5
+ builder.addUsersWithModifyAccess("user6");
+ builder.addUsersWithModifyAccess("user7");
UserGroupInformation dagUser = UserGroupInformation.createUserForTesting("dagUser", noGroups);
ACLManager amAclManager = new ACLManager(currentUser.getShortUserName(), conf);
- ACLManager aclManager = new ACLManager(amAclManager, dagUser.getShortUserName(), conf);
+ ACLManager aclManager = new ACLManager(amAclManager, dagUser.getShortUserName(),
+ builder.build());
Assert.assertTrue(aclManager.checkAMViewAccess(currentUser));
Assert.assertFalse(aclManager.checkAMViewAccess(dagUser));
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
index 6afc83b..4335a20 100644
--- a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
+++ b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
@@ -18,76 +18,14 @@
package org.apache.tez.common.security;
-import java.util.Arrays;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezConfiguration;
import org.junit.Assert;
import org.junit.Test;
-public class TestDAGAccessControls {
-
- @Test(timeout = 5000)
- public void testBasicSerializeToConf() {
- DAGAccessControls dagAccessControls = new DAGAccessControls();
- dagAccessControls.setUsersWithViewACLs(Arrays.asList("u1"))
- .setUsersWithModifyACLs(Arrays.asList("u2"))
- .setGroupsWithViewACLs(Arrays.asList("g1"))
- .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
- Configuration conf = new Configuration(false);
- dagAccessControls.serializeToConfiguration(conf);
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
- Assert.assertEquals("u1 g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertEquals("u2 g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
- }
-
- @Test(timeout = 5000)
- public void testWildCardSerializeToConf() {
- DAGAccessControls dagAccessControls = new DAGAccessControls();
- dagAccessControls.setUsersWithViewACLs(Arrays.asList("*"))
- .setUsersWithModifyACLs(Arrays.asList("*"))
- .setGroupsWithViewACLs(Arrays.asList("g1"))
- .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
- Configuration conf = new Configuration(false);
- dagAccessControls.serializeToConfiguration(conf);
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
- Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
- }
-
- @Test(timeout = 5000)
- public void testGroupsOnlySerializeToConf() {
- DAGAccessControls dagAccessControls = new DAGAccessControls();
- dagAccessControls.setGroupsWithViewACLs(Arrays.asList("g1"))
- .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
- Configuration conf = new Configuration(false);
- dagAccessControls.serializeToConfiguration(conf);
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
+import com.google.common.collect.Sets;
- Assert.assertEquals(" g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertEquals(" g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
- }
-
- @Test(timeout = 5000)
- public void testEmptySerializeToConf() {
- DAGAccessControls dagAccessControls = new DAGAccessControls();
-
- Configuration conf = new Configuration(false);
- dagAccessControls.serializeToConfiguration(conf);
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
- Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
- Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
- }
+public class TestDAGAccessControls {
@Test(timeout = 5000)
public void testStringBasedConstructor() {
@@ -102,8 +40,107 @@ public class TestDAGAccessControls {
Assert.assertTrue(dagAccessControls.getUsersWithModifyACLs().contains("u2"));
Assert.assertTrue(dagAccessControls.getGroupsWithViewACLs().contains("g1"));
Assert.assertTrue(dagAccessControls.getGroupsWithModifyACLs().contains("g2"));
+ }
+
+ @Test(timeout=5000)
+ public void testMergeIntoAmAcls() {
+ DAGAccessControls dagAccessControls = new DAGAccessControls("u1 g1", "u2 g2");
+ Configuration conf = new Configuration(false);
+
+ // default conf should have ACLs copied over.
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ // both have unique users merged should have all
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u1 g1");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u2 g2");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ // both have unique users merged should have all
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("u3,u1 g3,g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("u4,u2 g4,g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ // one of the user is *, merged is always *
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ // only * in the config, merged is *
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ // DAG access with *, all operation yeild *
+ dagAccessControls = new DAGAccessControls("*", "*");
+
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ // DAG access is empty, conf should be same.
+ dagAccessControls = new DAGAccessControls("", "");
+
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("u3 g3", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("u4 g4", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+ conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+ conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+ dagAccessControls.mergeIntoAmAcls(conf);
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+ assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
}
+ public void assertACLS(String expected, String obtained) {
+ if (expected.equals(obtained)) {
+ return;
+ }
+ String [] parts1 = expected.split(" ");
+ String [] parts2 = obtained.split(" ");
+
+ Assert.assertEquals(parts1.length, parts2.length);
+
+ Assert.assertEquals(
+ Sets.newHashSet(parts1[0].split(",")), Sets.newHashSet(parts2[0].split(",")));
+
+ if (parts1.length < 2) {
+ return;
+ }
+ Assert.assertEquals(
+ Sets.newHashSet(parts1[1].split(",")), Sets.newHashSet(parts2[1].split(",")));
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 005c027..8e1011f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -370,14 +370,14 @@ public class TestDAGPlan {
dag.addVertex(v1);
// Should succeed. Default context is containers.
- dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ dag.createDag(new TezConfiguration(false), null, null, null, true,
servicePluginsDescriptor, null);
// Set execute in AM should fail
v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
try {
- dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ dag.createDag(new TezConfiguration(false), null, null, null, true,
servicePluginsDescriptor, null);
fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
} catch (IllegalStateException e) {
@@ -386,13 +386,13 @@ public class TestDAGPlan {
// Valid context
v1.setExecutionContext(validExecContext);
- dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ dag.createDag(new TezConfiguration(false), null, null, null, true,
servicePluginsDescriptor, null);
// Invalid task scheduler
v1.setExecutionContext(invalidExecContext1);
try {
- dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ dag.createDag(new TezConfiguration(false), null, null, null, true,
servicePluginsDescriptor, null);
fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
} catch (IllegalStateException e) {
@@ -404,7 +404,7 @@ public class TestDAGPlan {
// Invalid ContainerLauncher
v1.setExecutionContext(invalidExecContext2);
try {
- dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ dag.createDag(new TezConfiguration(false), null, null, null, true,
servicePluginsDescriptor, null);
fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
} catch (IllegalStateException e) {
@@ -416,7 +416,7 @@ public class TestDAGPlan {
// Invalid task comm
v1.setExecutionContext(invalidExecContext3);
try {
- dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ dag.createDag(new TezConfiguration(false), null, null, null, true,
servicePluginsDescriptor, null);
fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
} catch (IllegalStateException e) {
@@ -444,7 +444,8 @@ public class TestDAGPlan {
new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)},
new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)});
- Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context);
+ Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1))
+ .setExecutionContext(v1Context);
Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
.addTaskLocalFiles(new HashMap<String, LocalResource>());
@@ -462,7 +463,7 @@ public class TestDAGPlan {
dag.addVertex(v1).addVertex(v2).addEdge(edge);
dag.setExecutionContext(defaultExecutionContext);
- DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, null,
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true,
servicePluginsDescriptor, null);
assertEquals(2, dagProto.getVertexCount());
@@ -502,8 +503,7 @@ public class TestDAGPlan {
TezConfiguration conf = new TezConfiguration(false);
conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -XX:+UseParallelGC ");
try {
- DAGPlan dagProto = dag.createDag(conf, null, null, null, true, null, null,
- new JavaOptsChecker());
+ dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker());
fail("Expected dag creation to fail for invalid java opts");
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains("Invalid/conflicting GC options"));
@@ -511,12 +511,11 @@ public class TestDAGPlan {
// Should not fail as java opts valid
conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -XX:-UseParallelGC ");
- DAGPlan dagProto1 = dag.createDag(conf, null, null, null, true, null, null,
- new JavaOptsChecker());
+ dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker());
// Should not fail as no checker enabled
conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -XX:+UseParallelGC ");
- DAGPlan dagProto2 = dag.createDag(conf, null, null, null, true, null, null, null);
+ dag.createDag(conf, null, null, null, true, null, null);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index c566b1a..794a597 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.api;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,9 +35,8 @@ import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.junit.Assert;
@@ -1044,21 +1044,11 @@ public class TestDAGVerify {
Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
- ConfigurationProto confProto = dagPlan.getDagConf();
- boolean foundViewAcls = false;
- boolean foundModifyAcls = false;
-
- for (PlanKeyValuePair pair : confProto.getConfKeyValuesList()) {
- if (pair.getKey().equals(TezConstants.TEZ_DAG_VIEW_ACLS)) {
- foundViewAcls = true;
- Assert.assertEquals("u1 g1", pair.getValue());
- } else if (pair.getKey().equals(TezConstants.TEZ_DAG_MODIFY_ACLS)) {
- foundModifyAcls = true;
- Assert.assertEquals("*", pair.getValue());
- }
- }
- Assert.assertTrue(foundViewAcls);
- Assert.assertTrue(foundModifyAcls);
+ ACLInfo aclInfo = dagPlan.getAclInfo();
+ Assert.assertEquals(Collections.singletonList("u1"), aclInfo.getUsersWithViewAccessList());
+ Assert.assertEquals(Collections.singletonList("g1"), aclInfo.getGroupsWithViewAccessList());
+ Assert.assertEquals(Collections.singletonList("*"), aclInfo.getUsersWithModifyAccessList());
+ Assert.assertEquals(Collections.singletonList("g2"), aclInfo.getGroupsWithModifyAccessList());
}
// v1 has input initializer
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 6f795fc..dc04f2d 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -44,6 +46,8 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Sets;
+
public class TestDagTypeConverters {
@Test(timeout = 5000)
@@ -208,6 +212,40 @@ public class TestDagTypeConverters {
verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
}
+ @Test
+ public void testAclConversions() {
+ DAGAccessControls dagAccessControls = new DAGAccessControls("u1,u2 g1,g2", "u3,u4 g3,g4");
+ ACLInfo aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+ assertSame(dagAccessControls, aclInfo);
+ assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+
+ dagAccessControls = new DAGAccessControls("u1 ", "u2 ");
+ aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+ assertSame(dagAccessControls, aclInfo);
+ assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+
+ dagAccessControls = new DAGAccessControls(" g1", " g3,g4");
+ aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+ assertSame(dagAccessControls, aclInfo);
+ assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+
+ dagAccessControls = new DAGAccessControls("*", "*");
+ aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+ assertSame(dagAccessControls, aclInfo);
+ assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+ }
+
+ private void assertSame(DAGAccessControls dagAccessControls, ACLInfo aclInfo) {
+ assertEquals(dagAccessControls.getUsersWithViewACLs(),
+ Sets.newHashSet(aclInfo.getUsersWithViewAccessList()));
+ assertEquals(dagAccessControls.getUsersWithModifyACLs(),
+ Sets.newHashSet(aclInfo.getUsersWithModifyAccessList()));
+ assertEquals(dagAccessControls.getGroupsWithViewACLs(),
+ Sets.newHashSet(aclInfo.getGroupsWithViewAccessList()));
+ assertEquals(dagAccessControls.getGroupsWithModifyACLs(),
+ Sets.newHashSet(aclInfo.getGroupsWithModifyAccessList()));
+ }
+
private void verifyPlugins(List<TezNamedEntityDescriptorProto> entities, int expectedCount,
String baseString, boolean hasPayload) {
assertEquals(expectedCount, entities.size());
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index fd6d446..481353b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -542,7 +542,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
this.aclManager = new ACLManager(appContext.getAMACLManager(), dagUGI.getShortUserName(),
- this.dagConf);
+ this.jobPlan.getAclInfo());
// this is only for recovery in case it does not call the init transition
this.startDAGCpuTime = appContext.getCumulativeCPUTime();
this.startDAGGCTime = appContext.getCumulativeGCTime();
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
index 91ffe7b..45e24ce 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
@@ -154,7 +154,6 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
if (domainId != null) {
// do nothing
LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
- return null;
} else {
if (!autoCreateDomain) {
// Error - Cannot fallback to default as that leaves ACLs open
@@ -164,18 +163,13 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
domainId = DOMAIN_ID_PREFIX + applicationId.toString();
createTimelineDomain(domainId, tezConf, dagAccessControls);
LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId);
- return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
}
+ return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
}
private Map<String, String> createDAGDomain(Configuration tezConf,
ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
throws IOException, HistoryACLPolicyException {
- if (dagAccessControls == null) {
- // No DAG specific ACLs
- return null;
- }
-
String domainId =
tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -193,7 +187,6 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
if (domainId != null) {
// do nothing
LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
- return null;
} else {
if (!autoCreateDomain) {
// Error - Cannot fallback to default as that leaves ACLs open
@@ -201,14 +194,17 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
+ " Domains is disabled");
}
+ // Create a domain only if dagAccessControls has been specified.
+ if (dagAccessControls == null) {
+ return null;
+ }
domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
createTimelineDomain(domainId, tezConf, dagAccessControls);
LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
- return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
}
+ return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
}
-
@Override
public void setConf(Configuration conf) {
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index 2c976f5..6b3ebd7 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.ReflectionUtils;
@@ -54,6 +53,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -73,7 +73,6 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
-import org.mockito.Matchers;
public class TestATSHistoryWithACLs {
@@ -307,170 +306,6 @@ public class TestATSHistoryWithACLs {
}
/**
- * test failure of domain creation during dag submittion in session mode
- * only affect logging for that dag not following submitted dag
- * @throws Exception
- */
- @Test (timeout=50000)
- public void testMultipleDagSession() throws Exception {
- TezClient tezSession = null;
- String viewAcls = "nobody nobody_group";
- SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
- DAG dag = DAG.create("TezSleepProcessor");
- Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
- SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
- Resource.newInstance(256, 1));
- dag.addVertex(vertex);
- DAGAccessControls accessControls = new DAGAccessControls();
- accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
- accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
- dag.setAccessControls(accessControls);
-
- TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
- tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
- tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
- ATSHistoryLoggingService.class.getName());
- Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
- .nextInt(100000))));
- remoteFs.mkdirs(remoteStagingDir);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
- tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
- tezSession.start();
-
- //////submit first dag which fails in dag creation//////
- ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
- atsHistoryACLManagerClassName);
- myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
- doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
- tezSession.setUpHistoryAclManager(myAclPolicyManager);
-
- DAGClient dagClient = tezSession.submitDAG(dag);
- DAGStatus dagStatus = dagClient.getDAGStatus(null);
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
- + dagStatus.getState());
- Thread.sleep(500l);
- dagStatus = dagClient.getDAGStatus(null);
- }
- assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
- String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
- assertEquals(dagLogging, "false");
-
- myAclPolicyManager.timelineClient = null;
- myAclPolicyManager.setConf(tezConf);
- tezSession.setUpHistoryAclManager(myAclPolicyManager);
-
- //////submit second dag which succeeds in dag creation//////
- DAG dag2 = DAG.create("TezSleepProcessor2");
- vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
- SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
- Resource.newInstance(256, 1));
- dag2.addVertex(vertex);
- accessControls = new DAGAccessControls();
- accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
- accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
- dag2.setAccessControls(accessControls);
- dagClient = tezSession.submitDAG(dag2);
- dagStatus = dagClient.getDAGStatus(null);
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
- + dagStatus.getState());
- Thread.sleep(500l);
- dagStatus = dagClient.getDAGStatus(null);
- }
- dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
- Assert.assertNull(dagLogging);
- myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient);
- tezSession.stop();
- verify(myAclPolicyManager.timelineClient, times(1)).stop();
- }
-
-/**
- * test failure of domain creation during dag submittion in nonsession mode
- * only affect logging for that dag not following submitted dag
- * @throws Exception
- */
- @Test (timeout=50000)
- public void testMultipleDagNonSession() throws Exception {
- TezClient tezClient = null;
- String viewAcls = "nobody nobody_group";
- SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
- DAG dag = DAG.create("TezSleepProcessor");
- Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
- SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
- Resource.newInstance(256, 1));
- dag.addVertex(vertex);
- DAGAccessControls accessControls = new DAGAccessControls();
- accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
- accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
- dag.setAccessControls(accessControls);
-
- TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
- tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
- tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
- ATSHistoryLoggingService.class.getName());
- Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
- .nextInt(100000))));
- remoteFs.mkdirs(remoteStagingDir);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
- tezClient = TezClient.create("TezSleepProcessor", tezConf, false);
- tezClient.start();
-
- //////submit first dag which fails in dag creation//////
- ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
- atsHistoryACLManagerClassName);
- myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
- doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
- tezClient.setUpHistoryAclManager(myAclPolicyManager);
-
- DAGClient dagClient = tezClient.submitDAG(dag);
- DAGStatus dagStatus = dagClient.getDAGStatus(null);
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
- + dagStatus.getState());
- Thread.sleep(500l);
- dagStatus = dagClient.getDAGStatus(null);
- }
- assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
- String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
- assertEquals(dagLogging, "false");
-
- myAclPolicyManager.timelineClient = null;
- myAclPolicyManager.setConf(tezConf);
- tezClient.setUpHistoryAclManager(myAclPolicyManager);
-
- //////submit second dag which succeeds in dag creation//////
- DAG dag2 = DAG.create("TezSleepProcessor2");
- vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
- SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
- Resource.newInstance(256, 1));
- dag2.addVertex(vertex);
- accessControls = new DAGAccessControls();
- accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
- accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
- dag2.setAccessControls(accessControls);
- dagClient = tezClient.submitDAG(dag2);
- dagStatus = dagClient.getDAGStatus(null);
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
- + dagStatus.getState());
- Thread.sleep(500l);
- dagStatus = dagClient.getDAGStatus(null);
- }
- dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
- Assert.assertNull(dagLogging);
- myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient);
- tezClient.stop();
- verify(myAclPolicyManager.timelineClient, times(1)).stop();
-
- }
- /**
* Test Disable Logging for all dags in a session
* due to failure to create domain in session start
* @throws Exception
@@ -501,19 +336,8 @@ public class TestATSHistoryWithACLs {
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
- ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
- atsHistoryACLManagerClassName);
- myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
- doThrow(new IOException("Fail to Put Domain")).
- when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
- tezSession.setUpHistoryAclManager(myAclPolicyManager);
tezSession.start();
- ///substitute back mocked timelineClient with a normal one
- myAclPolicyManager.timelineClient = null;
- myAclPolicyManager.setConf(tezConf);
- tezSession.setUpHistoryAclManager(myAclPolicyManager);
//////submit first dag //////
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
@@ -524,9 +348,7 @@ public class TestATSHistoryWithACLs {
dagStatus = dagClient.getDAGStatus(null);
}
assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
- String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
- assertEquals(dagLogging, "false");
-
+
//////submit second dag//////
DAG dag2 = DAG.create("TezSleepProcessor2");
vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
@@ -545,10 +367,9 @@ public class TestATSHistoryWithACLs {
Thread.sleep(500l);
dagStatus = dagClient.getDAGStatus(null);
}
- dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
- assertEquals(dagLogging, "false");
tezSession.stop();
}
+
/**
* use mini cluster to verify data do not push to ats when the daglogging flag
* in dagsubmittedevent is set off
@@ -559,6 +380,9 @@ public class TestATSHistoryWithACLs {
ATSHistoryLoggingService historyLoggingService;
historyLoggingService =
ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1));
+ historyLoggingService.setAppContext(appContext);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
String viewAcls = "nobody nobody_group";
tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
@@ -568,8 +392,8 @@ public class TestATSHistoryWithACLs {
.nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
- historyLoggingService.serviceInit(tezConf);
- historyLoggingService.serviceStart();
+ historyLoggingService.init(tezConf);
+ historyLoggingService.start();
ApplicationId appId = ApplicationId.newInstance(100l, 1);
TezDAGID tezDAGID = TezDAGID.getInstance(
appId, 100);
@@ -601,6 +425,9 @@ public class TestATSHistoryWithACLs {
ATSHistoryLoggingService historyLoggingService;
historyLoggingService =
ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1));
+ historyLoggingService.setAppContext(appContext);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
String viewAcls = "nobody nobody_group";
tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
@@ -610,8 +437,8 @@ public class TestATSHistoryWithACLs {
.nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
- historyLoggingService.serviceInit(tezConf);
- historyLoggingService.serviceStart();
+ historyLoggingService.init(tezConf);
+ historyLoggingService.start();
ApplicationId appId = ApplicationId.newInstance(100l, 1);
TezDAGID tezDAGID = TezDAGID.getInstance(
appId, 11);
@@ -636,7 +463,7 @@ public class TestATSHistoryWithACLs {
assertEquals(entity.getEntityType(), "TEZ_DAG_ID");
assertEquals(entity.getEvents().get(0).getEventType(), HistoryEventType.DAG_SUBMITTED.toString());
}
-
+
private static final String atsHistoryACLManagerClassName =
"org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
@Test (timeout=50000)
@@ -676,5 +503,4 @@ public class TestATSHistoryWithACLs {
}
}
-
}