You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/09/12 20:29:58 UTC

[1/2] tez git commit: TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. (Harish Jaiprakash via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master c07ec7b6f -> a23de4982


http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
index f0ec1eb..d28ffe0 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/ats/acls/ATSV15HistoryACLPolicyManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
@@ -159,7 +158,6 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -169,18 +167,13 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
       domainId = DOMAIN_ID_PREFIX + applicationId.toString();
       createTimelineDomain(applicationId, domainId, tezConf, dagAccessControls);
       LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId);
-      return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
     }
+    return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
   }
 
   private Map<String, String> createDAGDomain(Configuration tezConf,
       ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
       throws IOException, HistoryACLPolicyException {
-    if (dagAccessControls == null) {
-      // No DAG specific ACLs
-      return null;
-    }
-
     String domainId =
         tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
     if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -198,7 +191,6 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -206,14 +198,17 @@ public class ATSV15HistoryACLPolicyManager implements HistoryACLPolicyManager {
             + " Domains is disabled");
       }
 
+      // Create a domain only if dagAccessControls has been specified.
+      if (dagAccessControls == null) {
+        return null;
+      }
       domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
       createTimelineDomain(applicationId, domainId, tezConf, dagAccessControls);
       LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
-      return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
     }
+    return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
   }
 
-
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index dd21d2d..a095cbc 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -36,11 +37,16 @@ import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -80,7 +86,9 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
       ATSV15HistoryLoggingService.class.getName();
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
-  private HistoryACLPolicyManager historyACLPolicyManager;
+
+  @VisibleForTesting
+  HistoryACLPolicyManager historyACLPolicyManager;
 
   private int numDagsPerGroup;
 
@@ -133,7 +141,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
     if (maxTimeToWaitOnShutdown < 0) {
       waitForeverOnShutdown = true;
     }
-    sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
     LOG.info("Initializing " + ATSV15HistoryLoggingService.class.getSimpleName() + " with "
         + ", maxPollingTime(ms)=" + maxPollingTimeMillis
@@ -165,6 +172,15 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
     }
     timelineClient.start();
 
+    // create a session domain id, if it fails then disable history logging.
+    try {
+      sessionDomainId = createSessionDomain();
+    } catch (HistoryACLPolicyException | IOException e) {
+      LOG.warn("Could not setup history acls, disabling history logging.", e);
+      historyLoggingEnabled = false;
+      return;
+    }
+
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -216,9 +232,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceStop() {
-    if (!historyLoggingEnabled || timelineClient == null) {
-      return;
-    }
     LOG.info("Stopping ATSService"
         + ", eventQueueBacklog=" + eventQueue.size());
     stopped.set(true);
@@ -265,11 +278,12 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
       LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
           + ", eventQueueBacklog=" + eventQueue.size());
     }
-    timelineClient.stop();
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
     if (historyACLPolicyManager != null) {
       historyACLPolicyManager.close();
     }
-
   }
 
   @VisibleForTesting
@@ -331,13 +345,6 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
         skippedDAGs.add(dagId);
         return false;
       }
-      if (historyACLPolicyManager != null) {
-        String dagDomainId = dagSubmittedEvent.getConf().get(
-            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
-        if (dagDomainId != null) {
-          dagDomainIdMap.put(dagId, dagDomainId);
-        }
-      }
     }
     if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
       DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent();
@@ -363,28 +370,17 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
   }
 
   private void handleEvents(DAGHistoryEvent event) {
-    String domainId = sessionDomainId;
-    TezDAGID dagId = event.getDagID();
-
-    if (historyACLPolicyManager != null && dagId != null) {
-      if (dagDomainIdMap.containsKey(dagId)) {
-        domainId = dagDomainIdMap.get(dagId);
-      }
+    String domainId = getDomainForEvent(event);
+    // skippedDags is updated in the above call so check again.
+    if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
+      return;
     }
 
-    TimelineEntity entity =
-        HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+    TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+        event.getHistoryEvent());
 
-    if (historyACLPolicyManager != null) {
-      if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) {
-        if (domainId != null && !domainId.isEmpty()) {
-          historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
-        }
-      } else {
-        if (sessionDomainId != null && !sessionDomainId.isEmpty()) {
-          historyACLPolicyManager.updateTimelineEntityDomain(entity, sessionDomainId);
-        }
-      }
+    if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
+      historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
     }
 
     try {
@@ -408,7 +404,99 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
     } catch (Exception e) {
       LOG.warn("Could not handle history events", e);
     }
+  }
 
+  private String getDomainForEvent(DAGHistoryEvent event) {
+    String domainId = sessionDomainId;
+    if (historyACLPolicyManager == null) {
+      return domainId;
+    }
+
+    TezDAGID dagId = event.getDagID();
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (dagId == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
+      return domainId;
+    }
+
+    if (dagDomainIdMap.containsKey(dagId)) {
+      // If we already have the domain for the dag id return it
+      domainId = dagDomainIdMap.get(dagId);
+      // Cleanup if this is the last event.
+      if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+        dagDomainIdMap.remove(dagId);
+      }
+    } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()
+        || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
+      // In case this is the first event for the dag, create and populate dag domain.
+      Configuration conf;
+      DAGPlan dagPlan;
+      if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
+          conf = ((DAGSubmittedEvent)historyEvent).getConf();
+          dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan();
+      } else {
+         conf = appContext.getCurrentDAG().getConf();
+         dagPlan = appContext.getCurrentDAG().getJobPlan();
+      }
+      domainId = createDagDomain(conf, dagPlan, dagId);
+
+      // createDagDomain updates skippedDAGs so another check here.
+      if (skippedDAGs.contains(dagId)) {
+        return null;
+      }
+
+      dagDomainIdMap.put(dagId, domainId);
+    }
+    return domainId;
+  }
+
+  /**
+   * Creates a domain for the session.
+   * @return domainId to be used. null if acls are disabled.
+   * @throws HistoryACLPolicyException, IOException Forward if historyACLPolicyManger exception.
+   */
+  private String createSessionDomain() throws IOException, HistoryACLPolicyException {
+    if (historyACLPolicyManager == null) {
+      return null;
+    }
+    Map<String, String> domainInfo = historyACLPolicyManager.setupSessionACLs(getConfig(),
+        appContext.getApplicationID());
+    if (domainInfo != null) {
+      return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+    }
+    return null;
   }
 
+  /**
+   * When running in session mode, create a domain for the dag and return it.
+   * @param dagConf The configuration the dag for which domain has to be created.
+   * @param dagPlan The dag plan which contains the ACLs.
+   * @param dagId The dagId for which domain has to be created.
+   * @return The created domain id on success.
+   *     sessionDomainId: If there is a failure also disable history logging for this dag.
+   *     sessionDomainId: If historyACLPolicyManager returns null.
+   */
+  private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, TezDAGID dagId) {
+    // In non session mode dag domain is same as session domain id.
+    if (!appContext.isSession()) {
+      return sessionDomainId;
+    }
+    DAGAccessControls dagAccessControls = dagPlan.hasAclInfo()
+        ? DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo())
+        : null;
+    try {
+      Map<String, String> domainInfo = historyACLPolicyManager.setupSessionDAGACLs(
+          dagConf, appContext.getApplicationID(), Integer.toString(dagId.getId()),
+          dagAccessControls);
+      if (domainInfo != null) {
+        return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+      }
+      // Fallback to session domain, if domainInfo was null
+      return sessionDomainId;
+    } catch (IOException | HistoryACLPolicyException e) {
+      LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e);
+      skippedDAGs.add(dagId);
+      // Return value is not used, check for skippedDAG is important.
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
index 6f653ad..a690a19 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -300,6 +301,7 @@ public class TestATSHistoryV15 {
       ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
       AppContext appContext = mock(AppContext.class);
       when(appContext.getApplicationID()).thenReturn(appId);
+      when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
       service.setAppContext(appContext);
 
       TimelineEntityGroupId grpId = service.getGroupId(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index 1869b56..9111195 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -22,11 +22,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +51,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -66,10 +75,14 @@ public class TestATSV15HistoryLoggingService {
   private static String user = "TEST_USER";
 
   private InMemoryTimelineClient timelineClient;
+  private AppContext appContext;
 
   @Test(timeout=2000)
   public void testDAGGroupingDefault() throws Exception {
     ATSV15HistoryLoggingService service = createService(-1);
+
+    service.start();
+
     TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
     for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
       service.handle(event);
@@ -96,6 +109,8 @@ public class TestATSV15HistoryLoggingService {
   @Test(timeout=2000)
   public void testDAGGroupingDisabled() throws Exception {
     ATSV15HistoryLoggingService service = createService(1);
+    service.start();
+
     TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
     for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
       service.handle(event);
@@ -123,6 +138,8 @@ public class TestATSV15HistoryLoggingService {
   public void testDAGGroupingGroupingEnabled() throws Exception {
     int numDagsPerGroup = 100;
     ATSV15HistoryLoggingService service = createService(numDagsPerGroup);
+    service.start();
+
     TezDAGID dagId1 = TezDAGID.getInstance(appId, 1);
     for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
       service.handle(event);
@@ -172,14 +189,243 @@ public class TestATSV15HistoryLoggingService {
     service.stop();
   }
 
+  @Test
+  public void testNonSessionDomains() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+    .thenReturn(Collections.singletonMap(
+        TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+    service.start();
+
+    verify(historyACLPolicyManager, times(1))
+        .setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+    assertTrue(timelineClient.entityLog.size() > 0);
+
+    service.stop();
+  }
+
+  @Test
+  public void testNonSessionDomainsFailed() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+    .thenThrow(new IOException());
+
+    service.start();
+
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // History logging is disabled.
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+    assertEquals(0, timelineClient.entityLog.size());
+
+    service.stop();
+  }
+
+  @Test
+  public void testNonSessionDomainsAclNull() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+    .thenReturn(null);
+
+    service.start();
+
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // No domain updates but history logging happened.
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+    assertTrue(timelineClient.entityLog.size() > 0);
+
+    service.stop();
+  }
+
+  @Test
+  public void testSessionDomains() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    when(appContext.isSession()).thenReturn(true);
+
+    HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+    .thenReturn(Collections.singletonMap(
+        TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+    service.start();
+
+    // Verify that the session domain was created.
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Mock dag domain creation.
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-id"));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // Verify dag domain was created.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // calls were made with correct domain ids.
+    verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
+    verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-id"));
+
+    service.stop();
+  }
+
+  @Test
+  public void testSessionDomainsFailed() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    when(appContext.isSession()).thenReturn(true);
+
+    HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+    .thenThrow(new IOException());
+
+    service.start();
+
+    // Verify that the session domain was created.
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Mock dag domain creation.
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-id"));
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag creation was done.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // No history logging calls were done
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+    assertEquals(0, timelineClient.entityLog.size());
+
+    service.stop();
+  }
+
+  @Test
+  public void testSessionDomainsDagFailed() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+
+    when(appContext.isSession()).thenReturn(true);
+
+    HistoryACLPolicyManager historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
+    service.historyACLPolicyManager = historyACLPolicyManager;
+
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), eq(appId)))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+
+    service.start();
+
+    // Verify that the session domain creation was called.
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(), eq(appId));
+
+    // Mock dag domain creation.
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any()))
+    .thenThrow(new IOException());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // Verify dag domain creation was called.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // AM events sent, dag events are not sent.
+    verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("dag-id"));
+    assertEquals(1, timelineClient.entityLog.size());
+
+    service.stop();
+  }
+
   private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
     ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
-    AppContext appContext = mock(AppContext.class);
+    appContext = mock(AppContext.class);
     when(appContext.getApplicationID()).thenReturn(appId);
     when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
     service.setAppContext(appContext);
 
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(false);
     if (numDagsPerGroup != -1) {
       conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
           numDagsPerGroup);
@@ -191,7 +437,6 @@ public class TestATSV15HistoryLoggingService {
     timelineClient.init(conf);
     service.timelineClient = timelineClient;
 
-    service.start();
     return service;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 6b8d6e5..dc215fd 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -36,13 +38,17 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezReflectionException;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -54,8 +60,8 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   private static final Logger LOG = LoggerFactory.getLogger(ATSHistoryLoggingService.class);
 
-  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<DAGHistoryEvent>();
+  @VisibleForTesting
+  LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue<DAGHistoryEvent>();
 
   private Thread eventHandlingThread;
   private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -80,7 +86,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
-  private HistoryACLPolicyManager historyACLPolicyManager;
+
+  @VisibleForTesting
+  HistoryACLPolicyManager historyACLPolicyManager;
 
   public ATSHistoryLoggingService() {
     super(ATSHistoryLoggingService.class.getName());
@@ -121,7 +129,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     if (maxTimeToWaitOnShutdown < 0) {
       waitForeverOnShutdown = true;
     }
-    sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
 
     LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with "
       + "maxEventsPerBatch=" + maxEventsPerBatch
@@ -150,8 +157,17 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     if (!historyLoggingEnabled || timelineClient == null) {
       return;
     }
+
     timelineClient.start();
 
+    try {
+      sessionDomainId = createSessionDomain();
+    } catch (HistoryACLPolicyException | IOException e) {
+      LOG.warn("Could not setup history acls, disabling history logging.", e);
+      historyLoggingEnabled = false;
+      return;
+    }
+
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -200,9 +216,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   @Override
   public void serviceStop() {
-    if (!historyLoggingEnabled || timelineClient == null) {
-      return;
-    }
     LOG.info("Stopping ATSService"
         + ", eventQueueBacklog=" + eventQueue.size());
     stopped.set(true);
@@ -242,7 +255,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
           + ", eventQueueBacklog=" + eventQueue.size());
     }
-    timelineClient.stop();
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
     if (historyACLPolicyManager != null) {
       historyACLPolicyManager.close();
     }
@@ -289,13 +304,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
         skippedDAGs.add(dagId);
         return false;
       }
-      if (historyACLPolicyManager != null) {
-        String dagDomainId = dagSubmittedEvent.getConf().get(
-            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
-        if (dagDomainId != null) {
-          dagDomainIdMap.put(dagId, dagDomainId);
-        }
-      }
     }
     if (eventType.equals(HistoryEventType.DAG_RECOVERED)) {
       DAGRecoveredEvent dagRecoveredEvent = (DAGRecoveredEvent) event.getHistoryEvent();
@@ -321,33 +329,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
   }
 
   private void handleEvents(List<DAGHistoryEvent> events) {
-    TimelineEntity[] entities = new TimelineEntity[events.size()];
-    for (int i = 0; i < events.size(); ++i) {
-      DAGHistoryEvent event = events.get(i);
-      String domainId = sessionDomainId;
-
-      TezDAGID dagId = event.getDagID();
-
-      if (historyACLPolicyManager != null && dagId != null) {
-        if (dagDomainIdMap.containsKey(dagId)) {
-          domainId = dagDomainIdMap.get(dagId);
-        }
+    List<TimelineEntity> entities = new ArrayList<>(events.size());
+    for (DAGHistoryEvent event : events) {
+      String domainId = getDomainForEvent(event);
+      // skippedDags is updated in the above call so check again.
+      if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
+        continue;
       }
-
-      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
-
-      if (historyACLPolicyManager != null) {
-        if (HistoryEventType.isDAGSpecificEvent(event.getHistoryEvent().getEventType())) {
-          if (domainId != null && !domainId.isEmpty()) {
-            historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
-          }
-        } else {
-          if (sessionDomainId != null && !sessionDomainId.isEmpty()) {
-            historyACLPolicyManager.updateTimelineEntityDomain(entities[i], sessionDomainId);
-          }
-        }
+      TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+          event.getHistoryEvent());
+      entities.add(entity);
+      if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
+        historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
       }
-
     }
 
     if (LOG.isDebugEnabled()) {
@@ -355,7 +349,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     }
     try {
       TimelinePutResponse response =
-          timelineClient.putEntities(entities);
+          timelineClient.putEntities(entities.toArray(new TimelineEntity[entities.size()]));
       if (response != null
         && !response.getErrors().isEmpty()) {
         int count = response.getErrors().size();
@@ -375,4 +369,97 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     }
   }
 
+  private String getDomainForEvent(DAGHistoryEvent event) {
+    String domainId = sessionDomainId;
+    if (historyACLPolicyManager == null) {
+      return domainId;
+    }
+
+    TezDAGID dagId = event.getDagID();
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    if (dagId == null || !HistoryEventType.isDAGSpecificEvent(historyEvent.getEventType())) {
+      return domainId;
+    }
+
+    if (dagDomainIdMap.containsKey(dagId)) {
+      // If we already have the domain for the dag id return it
+      domainId = dagDomainIdMap.get(dagId);
+      // Cleanup if this is the last event.
+      if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+        dagDomainIdMap.remove(dagId);
+      }
+    } else if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()
+        || HistoryEventType.DAG_RECOVERED == historyEvent.getEventType()) {
+      // In case this is the first event for the dag, create and populate dag domain.
+      Configuration conf;
+      DAGPlan dagPlan;
+      if (HistoryEventType.DAG_SUBMITTED == historyEvent.getEventType()) {
+          conf = ((DAGSubmittedEvent)historyEvent).getConf();
+          dagPlan = ((DAGSubmittedEvent)historyEvent).getDAGPlan();
+      } else {
+         conf = appContext.getCurrentDAG().getConf();
+         dagPlan = appContext.getCurrentDAG().getJobPlan();
+      }
+      domainId = createDagDomain(conf, dagPlan, dagId);
+
+      // createDagDomain updates skippedDAGs so another check here.
+      if (skippedDAGs.contains(dagId)) {
+        return null;
+      }
+
+      dagDomainIdMap.put(dagId, domainId);
+    }
+    return domainId;
+  }
+
+  /**
+   * Creates a domain for the session.
+   * @return domainId to be used. null if acls are disabled.
+   * @throws HistoryACLPolicyException, IOException Forward if historyACLPolicyManger exception.
+   */
+  private String createSessionDomain() throws HistoryACLPolicyException, IOException {
+    if (historyACLPolicyManager == null) {
+      return null;
+    }
+    Map<String, String> domainInfo = historyACLPolicyManager.setupSessionACLs(getConfig(),
+        appContext.getApplicationID());
+    if (domainInfo != null) {
+      return domainInfo.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+    }
+    return null;
+  }
+
+  /**
+   * When running in session mode, create a domain for the dag and return it.
+   * @param dagConf The configuration the dag for which domain has to be created.
+   * @param dagPlan The dag plan which contains the ACLs.
+   * @param dagId The dagId for which domain has to be created.
+   * @return The created domain id on success.
+   *     sessionDomainId: If there is a failure also disable history logging for this dag.
+   *     sessionDomainId: If historyACLPolicyManager returns null.
+   */
+  private String createDagDomain(Configuration dagConf, DAGPlan dagPlan, TezDAGID dagId) {
+    // In non session mode dag domain is same as session domain id.
+    if (!appContext.isSession()) {
+      return sessionDomainId;
+    }
+    DAGAccessControls dagAccessControls = dagPlan.hasAclInfo()
+        ? DagTypeConverters.convertDAGAccessControlsFromProto(dagPlan.getAclInfo())
+        : null;
+    try {
+      Map<String, String> domainInfo = historyACLPolicyManager.setupSessionDAGACLs(
+          dagConf, appContext.getApplicationID(), Integer.toString(dagId.getId()),
+          dagAccessControls);
+      if (domainInfo != null) {
+        return domainInfo.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+      }
+      // Fallback to session domain, if domainInfo was null
+      return sessionDomainId;
+    } catch (IOException | HistoryACLPolicyException e) {
+      LOG.warn("Could not setup ACLs for DAG, disabling history logging for dag.", e);
+      skippedDAGs.add(dagId);
+      // Return value is not used, check for skippedDAG is important.
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index 464864e..da57eb2 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -21,16 +21,30 @@ package org.apache.tez.dag.history.logging.ats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -39,9 +53,19 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class TestATSHistoryLoggingService {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryLoggingService.class);
@@ -51,11 +75,15 @@ public class TestATSHistoryLoggingService {
   private Configuration conf;
   private int atsInvokeCounter;
   private int atsEntitiesCounter;
+  private HistoryACLPolicyManager historyACLPolicyManager;
   private SystemClock clock = new SystemClock();
+  private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+  private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
 
   @Before
   public void setup() throws Exception {
     appContext = mock(AppContext.class);
+    historyACLPolicyManager = mock(HistoryACLPolicyManager.class);
     atsHistoryLoggingService = new ATSHistoryLoggingService();
     atsHistoryLoggingService.setAppContext(appContext);
     conf = new Configuration(false);
@@ -63,13 +91,16 @@ public class TestATSHistoryLoggingService {
         1000l);
     conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
     conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    conf.set(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain");
     atsInvokeCounter = 0;
     atsEntitiesCounter = 0;
     atsHistoryLoggingService.init(conf);
+    atsHistoryLoggingService.historyACLPolicyManager = historyACLPolicyManager;
     atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
-    atsHistoryLoggingService.start();
+
     when(appContext.getClock()).thenReturn(clock);
     when(appContext.getCurrentDAGID()).thenReturn(null);
+    when(appContext.getApplicationID()).thenReturn(appId);
     when(atsHistoryLoggingService.timelineClient.putEntities(
         Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
         new Answer<Object>() {
@@ -96,6 +127,7 @@ public class TestATSHistoryLoggingService {
 
   @Test(timeout=20000)
   public void testATSHistoryLoggingServiceShutdown() {
+    atsHistoryLoggingService.start();
     TezDAGID tezDAGID = TezDAGID.getInstance(
         ApplicationId.newInstance(100l, 1), 1);
     DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
@@ -122,6 +154,7 @@ public class TestATSHistoryLoggingService {
 
   @Test(timeout=20000)
   public void testATSEventBatching() {
+    atsHistoryLoggingService.start();
     TezDAGID tezDAGID = TezDAGID.getInstance(
         ApplicationId.newInstance(100l, 1), 1);
     DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
@@ -145,9 +178,8 @@ public class TestATSHistoryLoggingService {
 
   @Test(timeout=20000)
   public void testTimelineServiceDisable() throws Exception {
+    atsHistoryLoggingService.start();
     ATSHistoryLoggingService atsHistoryLoggingService1;
-    AppContext appContext1;
-    appContext1 = mock(AppContext.class);
     atsHistoryLoggingService1 = new ATSHistoryLoggingService();
 
     atsHistoryLoggingService1.setAppContext(appContext);
@@ -160,7 +192,7 @@ public class TestATSHistoryLoggingService {
         ++atsInvokeCounter;
         atsEntitiesCounter += invocation.getArguments().length;
         try {
-          Thread.sleep(500l);
+          Thread.sleep(10l);
         } catch (InterruptedException e) {
           // do nothing
         }
@@ -181,7 +213,7 @@ public class TestATSHistoryLoggingService {
     }
 
     try {
-        Thread.sleep(1000l);
+        Thread.sleep(20l);
     } catch (InterruptedException e) {
         // Do nothing
     }
@@ -190,6 +222,238 @@ public class TestATSHistoryLoggingService {
     Assert.assertEquals(atsInvokeCounter, 0);
     Assert.assertEquals(atsEntitiesCounter, 0);
     Assert.assertNull(atsHistoryLoggingService1.timelineClient);
-    
+    atsHistoryLoggingService1.close();
+  }
+
+  @Test(timeout=10000)
+  public void testNonSessionDomains() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-id"));
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+        (Configuration)any(), (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+  }
+
+  @Test(timeout=10000)
+  public void testNonSessionDomainsFailed() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenThrow(new IOException());
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+        (Configuration)any(), (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(1000);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
+    Assert.assertEquals(0, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testNonSessionDomainsAclNull() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenReturn(null);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs(
+        (Configuration)any(), (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
+    Assert.assertEquals(5, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomains() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "test-domain"));
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-domain"));
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("test-domain"));
+    verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-domain"));
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomainsFailed() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenThrow(new IOException());
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(
+        Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, "dag-domain"));
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(1000);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(0))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // No calls were made for domains.
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+    Assert.assertEquals(0, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomainsDagFailed() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenReturn(Collections.singletonMap(
+        TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, "session-domain"));
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+    .thenThrow(new IOException());
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // DAG domain was called once.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(1))
+        .updateTimelineEntityDomain(any(), eq("session-domain"));
+    verify(historyACLPolicyManager, times(1))
+        .updateTimelineEntityDomain(any(), (String)any());
+    Assert.assertEquals(1, atsEntitiesCounter);
+  }
+
+  @Test(timeout=10000)
+  public void testSessionDomainsAclNull() throws Exception {
+    when(historyACLPolicyManager.setupSessionACLs((Configuration)any(), (ApplicationId)any()))
+    .thenReturn(null);
+
+    when(historyACLPolicyManager.setupSessionDAGACLs(
+        (Configuration)any(), (ApplicationId)any(), eq("0"), (DAGAccessControls)any()))
+    .thenReturn(null);
+
+    when(appContext.isSession()).thenReturn(true);
+    atsHistoryLoggingService.start();
+    verify(historyACLPolicyManager, times(1)).setupSessionACLs((Configuration)any(),
+        (ApplicationId)any());
+
+    // Send the event and wait for completion.
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, atsHistoryLoggingService)) {
+      atsHistoryLoggingService.handle(event);
+    }
+    Thread.sleep(2500);
+    while (!atsHistoryLoggingService.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+    // No dag domain were created.
+    verify(historyACLPolicyManager, times(1))
+      .setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
+
+    // All calls made with session domain id.
+    verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
+    Assert.assertEquals(5, atsEntitiesCounter);
+  }
+
+  private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+      ATSHistoryLoggingService service) {
+    List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+    long time = System.currentTimeMillis();
+    Configuration conf = new Configuration(service.getConfig());
+    historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user")));
+    historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
+        DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null)));
+    TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
+    TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+    historyEvents
+        .add(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskID, "test", time, time)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
+            ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null,
+            null, null)));
+    return historyEvents;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b0bacf4..3be7131 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -663,7 +663,8 @@ public class MRRSleepJob extends Configured implements Tool {
           " [-irt intermediateReduceSleepTime]" +
           " [-recordt recordSleepTime (msec)]" +
           " [-generateSplitsInAM (false)/true]" +
-          " [-writeSplitsToDfs (false)/true]");
+          " [-writeSplitsToDfs (false)/true]" +
+          " [-numDags numDagsToSubmit");
       ToolRunner.printGenericCommandUsage(System.err);
       return 2;
     }
@@ -676,6 +677,8 @@ public class MRRSleepJob extends Configured implements Tool {
     boolean writeSplitsToDfs = false;
     boolean generateSplitsInAM = false;
     boolean splitsOptionFound = false;
+    boolean isSession = false;
+    int numDags = 1;
 
     for(int i=0; i < args.length; i++ ) {
       if(args[i].equals("-m")) {
@@ -716,6 +719,12 @@ public class MRRSleepJob extends Configured implements Tool {
         }
         splitsOptionFound = true;
         writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
+      } else if (args[i].equals("-numDags")) {
+        numDags = Integer.parseInt(args[++i]);
+        if (numDags < 1) {
+          throw new RuntimeException("numDags should be positive");
+        }
+        isSession = numDags > 1;
       }
     }
 
@@ -747,13 +756,20 @@ public class MRRSleepJob extends Configured implements Tool {
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
         iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
 
-    TezClient tezSession = TezClient.create("MRRSleep", conf, false, null, credentials);
+    TezClient tezSession = TezClient.create("MRRSleep", conf, isSession, null, credentials);
     tezSession.start();
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    dagClient.waitForCompletion();
-    tezSession.stop();
-
-    return dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED) ? 0 : 1;
+    try {
+      for (; numDags > 0; --numDags) {
+        DAGClient dagClient = tezSession.submitDAG(dag);
+        dagClient.waitForCompletion();
+        if (!dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED)) {
+          return 1;
+        }
+      }
+    } finally {
+      tezSession.stop();
+    }
+    return 0;
   }
 
 }


[2/2] tez git commit: TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. (Harish Jaiprakash via hitesh)

Posted by hi...@apache.org.
TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. (Harish Jaiprakash via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a23de498
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a23de498
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a23de498

Branch: refs/heads/master
Commit: a23de4982e4ed0d55fb711745e7670b3be4b266e
Parents: c07ec7b
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Sep 12 13:29:22 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Sep 12 13:29:22 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/client/TezClient.java   |  89 +-----
 .../org/apache/tez/client/TezClientUtils.java   |  69 +----
 .../apache/tez/common/security/ACLManager.java  |  27 +-
 .../tez/common/security/DAGAccessControls.java  |  43 ++-
 .../security/HistoryACLPolicyManager.java       |  30 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  32 +--
 .../apache/tez/dag/api/DagTypeConverters.java   |  25 ++
 tez-api/src/main/proto/DAGApiRecords.proto      |   8 +
 .../org/apache/tez/client/TestTezClient.java    |   6 +-
 .../apache/tez/client/TestTezClientUtils.java   |  18 +-
 .../tez/common/security/TestACLManager.java     |  24 +-
 .../common/security/TestDAGAccessControls.java  | 167 ++++++-----
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  25 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  24 +-
 .../tez/dag/api/TestDagTypeConverters.java      |  38 +++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   2 +-
 .../ats/acls/ATSHistoryACLPolicyManager.java    |  16 +-
 .../ats/acls/TestATSHistoryWithACLs.java        | 202 +-------------
 .../ats/acls/ATSV15HistoryACLPolicyManager.java |  17 +-
 .../ats/ATSV15HistoryLoggingService.java        | 154 ++++++++---
 .../dag/history/ats/acls/TestATSHistoryV15.java |   2 +
 .../ats/TestATSV15HistoryLoggingService.java    | 251 ++++++++++++++++-
 .../logging/ats/ATSHistoryLoggingService.java   | 171 +++++++++---
 .../ats/TestATSHistoryLoggingService.java       | 276 ++++++++++++++++++-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  30 +-
 26 files changed, 1127 insertions(+), 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd6ab68..99bae83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM.
   TEZ-3272. Add AMContainerImpl and AMNodeImpl to StateMachine visualization list.
   TEZ-3284. Synchronization for every write in UnorderdKVWriter
   TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 71ba6b2..780fcb7 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.util.Time;
 import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -69,7 +68,6 @@ import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
@@ -79,7 +77,6 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequest
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
 import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.common.security.HistoryACLPolicyException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -136,11 +133,9 @@ public class TezClient {
   final TezApiVersionInfo apiVersionInfo;
   @VisibleForTesting
   final ServicePluginsDescriptor servicePluginsDescriptor;
-  private HistoryACLPolicyManager historyACLPolicyManager;
   private JavaOptsChecker javaOptsChecker = null;
 
   private int preWarmDAGCounter = 0;
-  private int dagCounter = 0;
 
   /* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */
   private int maxSubmitDAGRequestSizeThroughIPC;
@@ -148,15 +143,6 @@ public class TezClient {
   private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0);
   private FileSystem stagingFs = null;
 
-  private static final String atsHistoryLoggingServiceClassName =
-      "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
-  private static final String atsHistoryACLManagerClassName =
-      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
-  private static final String atsv15HistoryLoggingServiceClassName =
-      "org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService";
-  private static final String atsV15HistoryACLManagerClassName =
-      "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
-
   private TezClient(String name, TezConfiguration tezConf) {
     this(name, tezConf, tezConf.getBoolean(
         TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
@@ -375,12 +361,6 @@ public class TezClient {
         historyLogLevel);
   }
 
-  @Private
-  @VisibleForTesting
-  public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager) {
-    historyACLPolicyManager =  myAclPolicyManager;
-  }
-
   /**
    * Start the client. This establishes a connection to the YARN cluster.
    * In session mode, this start the App Master thats runs all the DAGs in the
@@ -395,40 +375,6 @@ public class TezClient {
     frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration());
     frameworkClient.start();
 
-    ///need additional check for historyACLPolicyManager because tests could stub historyACLPolicyManager
-    ///before tezclient start. If there is already a stubbed historyACLPolicyManager, we don't overwrite it
-    if (historyACLPolicyManager == null) {
-      //TODO: FIXME: The ACL manager should be retrieved either from the
-      //logging service directly or via a pluggable factory that can
-      //instantiate ACL managers and logging services
-      String logSvcClassName = amConfig.getTezConfiguration().get(
-          TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
-      String aclMgrClassName = null;
-      if (logSvcClassName.equals(atsHistoryLoggingServiceClassName)) {
-        aclMgrClassName = atsHistoryACLManagerClassName;
-      } else if (logSvcClassName.equals(
-        atsv15HistoryLoggingServiceClassName)) {
-        aclMgrClassName = atsV15HistoryACLManagerClassName;
-      }
-      if (aclMgrClassName != null) {
-        LOG.info("Using " + aclMgrClassName + " to manage Timeline ACLs");
-        try {
-          historyACLPolicyManager = ReflectionUtils.createClazzInstance(
-              aclMgrClassName);
-          historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
-        } catch (TezReflectionException e) {
-          if (!amConfig.getTezConfiguration().getBoolean(
-              TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
-              TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
-            LOG.warn("Could not instantiate object for " + aclMgrClassName
-                + ". ACLs cannot be enforced correctly for history data in Timeline", e);
-            throw e;
-          }
-          historyACLPolicyManager = null;
-        }
-      }
-    }
-
     if (this.amConfig.getTezConfiguration().getBoolean(
         TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED,
         TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) {
@@ -476,7 +422,7 @@ public class TezClient {
                 sessionAppId,
                 null, clientName, amConfig,
                 tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
-                historyACLPolicyManager, servicePluginsDescriptor, javaOptsChecker);
+                servicePluginsDescriptor, javaOptsChecker);
   
         // Set Tez Sessions to not retry on AM crashes if recovery is disabled
         if (!amConfig.getTezConfiguration().getBoolean(
@@ -511,7 +457,6 @@ public class TezClient {
    *           if submission timed out
    */  
   public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException {
-    ++dagCounter;
     if (isSession) {
       return submitDAGSession(dag);
     } else {
@@ -545,32 +490,9 @@ public class TezClient {
     }
 
     TezConfiguration dagClientConf = new TezConfiguration(amConfig.getTezConfiguration());
-    Map<String, String> aclConfigs = null;
-    // TEZ_AM_HISTORY_LOGGING_ENABLED is a config setting enable/disable logging of all
-    // dags within a session
-    boolean sessionHistoryLoggingEnabled = amConfig.getTezConfiguration().getBoolean(
-        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-        TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
-    if (historyACLPolicyManager != null && sessionHistoryLoggingEnabled) {
-      try {
-        aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
-            amConfig.getTezConfiguration(), sessionAppId,
-            Integer.toString(dagCounter), dag.getDagAccessControls());
-      } catch (HistoryACLPolicyException e) {
-        LOG.warn("Disabling history logging for dag " +
-          dag.getName() + " due to error in setting up history acls " + e);
-        dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
-        dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, false);
-      }
-    } else if (!sessionHistoryLoggingEnabled) {
-      dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
-      dagClientConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, false);
-    }
-
     Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
     DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
-        usingTezArchiveDeploy, sessionCredentials, aclConfigs, servicePluginsDescriptor,
-        javaOptsChecker);
+        usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
 
     SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
     requestBuilder.setDAGPlan(dagPlan);
@@ -644,10 +566,6 @@ public class TezClient {
    */
   public synchronized void stop() throws TezException, IOException {
     try {
-      if (historyACLPolicyManager != null) {
-        historyACLPolicyManager.close();
-      }
-
       if (sessionStarted) {
         LOG.info("Shutting down Tez Session"
             + ", sessionName=" + clientName
@@ -1032,8 +950,7 @@ public class TezClient {
       ApplicationSubmissionContext appContext = TezClientUtils
           .createApplicationSubmissionContext(
               appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
-              usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager,
-              servicePluginsDescriptor, javaOptsChecker);
+              usingTezArchiveDeploy, apiVersionInfo, servicePluginsDescriptor, javaOptsChecker);
       String callerContextStr = "";
       if (dag.getCallerContext() != null) {
         callerContextStr = ", callerContext=" + dag.getCallerContext().contextAsSimpleString();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index eb1a95e..f440b6f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -86,8 +86,6 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.security.ACLManager;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
-import org.apache.tez.common.security.HistoryACLPolicyException;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -445,7 +443,6 @@ public class TezClientUtils {
    * @param amConfig AM Configuration
    * @param tezJarResources Resources to be used by the AM
    * @param sessionCreds the credential object which will be populated with session specific
-   * @param historyACLPolicyManager
    * @param servicePluginsDescriptor descriptor for services which may be running in the AM
    * @return an ApplicationSubmissionContext to launch a Tez AM
    * @throws IOException
@@ -457,7 +454,7 @@ public class TezClientUtils {
       ApplicationId appId, DAG dag, String amName,
       AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
       Credentials sessionCreds, boolean tezLrsAsArchive,
-      TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager,
+      TezApiVersionInfo apiVersionInfo,
       ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
       throws IOException, YarnException {
 
@@ -565,41 +562,19 @@ public class TezClientUtils {
     }
     amLocalResources.putAll(tezJarResources);
 
-    // Setup Session ACLs and update conf as needed
-    Map<String, String> aclConfigs = null;
-    if (historyACLPolicyManager != null) {
-      if (dag == null) {
-        try{
-          aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
-              appId);
-        } catch (HistoryACLPolicyException e) {
-          LOG.warn("Disabling history logging for session " + strAppId +
-                   " due to error in setting up history acls " + e);
-          amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-              false);
-        }
-      } else {
-        try{
-          // Non-session mode
-          // As only a single DAG is support, we should combine AM and DAG ACLs under the same
-          // acl management layer
-          aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
-              appId, dag.getDagAccessControls());
-        } catch (HistoryACLPolicyException e) {
-          LOG.warn("Disabling history logging for dag " +
-              dag.getName() + " due to error in setting up history acls " + e);
-          dag.setConf(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, "false");
-          // This is non-session mode so disable logging for whole AM
-          amConfig.getTezConfiguration().setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-              false);
-        }
-      }
+    TezConfiguration tezConf = amConfig.getTezConfiguration();
+    // Merge the dag access controls into tez am config.
+    if (dag != null && dag.getDagAccessControls() != null) {
+      // Merge updates the conf object passed. In non session mode, same client object can be used
+      // to submit multiple dags, copying this prevents ACL of one DAG from being used in another.
+      tezConf = new TezConfiguration(amConfig.getTezConfiguration());
+      dag.getDagAccessControls().mergeIntoAmAcls(tezConf);
     }
 
     // emit conf as PB file
-    ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(),
-        aclConfigs, servicePluginsDescriptor);
-    
+    ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
+        servicePluginsDescriptor);
+
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
       amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath);
@@ -737,20 +712,10 @@ public class TezClientUtils {
       Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
       Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor,
       JavaOptsChecker javaOptsChecker) throws IOException {
-    return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials,
-        null, servicePluginsDescriptor, javaOptsChecker);
-  }
-
-  static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
-      Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
-      Credentials credentials, Map<String, String> additionalDAGConfigs,
-      ServicePluginsDescriptor servicePluginsDescriptor,
-      JavaOptsChecker javaOptsChecker) throws IOException {
     Credentials dagCredentials = setupDAGCredentials(dag, credentials,
         amConfig.getTezConfiguration());
     return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
-        amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, servicePluginsDescriptor,
-        javaOptsChecker);
+        amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker);
   }
   
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -829,7 +794,7 @@ public class TezClientUtils {
   }
 
   static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
-    Map<String, String> additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor) {
+    ServicePluginsDescriptor servicePluginsDescriptor) {
     assert amConf != null;
     ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
     for (Entry<String, String> entry : amConf) {
@@ -838,14 +803,6 @@ public class TezClientUtils {
       kvp.setValue(amConf.get(entry.getKey()));
       builder.addConfKeyValues(kvp);
     }
-    if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
-      for (Entry<String, String> entry : additionalConfigs.entrySet()) {
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        builder.addConfKeyValues(kvp);
-      }
-    }
 
     AMPluginDescriptorProto pluginDescriptorProto =
         DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
index e1c7314..08680a5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
@@ -26,15 +26,15 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 /**
  * Class to manage ACLs for the Tez AM and DAGs and provides functionality to check whether
@@ -42,8 +42,6 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @Private
 public class ACLManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ACLManager.class);
   public static final String WILDCARD_ACL_VALUE = "*";
 
   private final String dagUser;
@@ -75,7 +73,7 @@ public class ACLManager {
     }
   }
 
-  public ACLManager(ACLManager amACLManager, String dagUser, Configuration dagConf) {
+  public ACLManager(ACLManager amACLManager, String dagUser, ACLInfo aclInfo) {
     this.amUser = amACLManager.amUser;
     this.dagUser = dagUser;
     this.users = amACLManager.users;
@@ -84,12 +82,21 @@ public class ACLManager {
     if (!aclsEnabled) {
       return;
     }
-    ACLConfigurationParser parser = new ACLConfigurationParser(dagConf, true);
-    if (parser.getAllowedUsers() != null) {
-      this.users.putAll(parser.getAllowedUsers());
+    if (aclInfo.getUsersWithViewAccessCount() > 0) {
+      this.users.put(ACLType.DAG_VIEW_ACL,
+          Sets.newLinkedHashSet(aclInfo.getUsersWithViewAccessList()));
     }
-    if (parser.getAllowedGroups() != null) {
-      this.groups.putAll(parser.getAllowedGroups());
+    if (aclInfo.getUsersWithModifyAccessCount() > 0) {
+      this.users.put(ACLType.DAG_MODIFY_ACL,
+          Sets.newLinkedHashSet(aclInfo.getUsersWithModifyAccessList()));
+    }
+    if (aclInfo.getGroupsWithViewAccessCount() > 0) {
+      this.groups.put(ACLType.DAG_VIEW_ACL,
+          Sets.newLinkedHashSet(aclInfo.getGroupsWithViewAccessList()));
+    }
+    if (aclInfo.getGroupsWithModifyAccessCount() > 0) {
+      this.groups.put(ACLType.DAG_MODIFY_ACL,
+          Sets.newLinkedHashSet(aclInfo.getGroupsWithModifyAccessList()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
index 5fe352a..520416d 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
@@ -27,8 +27,11 @@ import java.util.Set;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 
+import com.google.common.collect.ImmutableMap;
+
 /**
  * Access controls for the DAG
  */
@@ -150,23 +153,37 @@ public class DAGAccessControls {
     return Collections.unmodifiableSet(groupsWithModifyACLs);
   }
 
+  /**
+   * Merge the dag acls with the AM acls in the configuration object. The config object will contain
+   * the updated acls.
+   * @param conf The AM config.
+   */
   @Private
-  public synchronized void serializeToConfiguration(Configuration conf) {
-    if (usersWithViewACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) {
-      conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE);
+  public synchronized void mergeIntoAmAcls(Configuration conf) {
+    ACLConfigurationParser parser = new ACLConfigurationParser(conf, false);
+    parser.addAllowedGroups(ImmutableMap.of(
+        ACLType.AM_VIEW_ACL, groupsWithViewACLs, ACLType.AM_MODIFY_ACL, groupsWithModifyACLs));
+    parser.addAllowedUsers(ImmutableMap.of(
+        ACLType.AM_VIEW_ACL, usersWithViewACLs, ACLType.AM_MODIFY_ACL, usersWithModifyACLs));
+
+    Set<String> viewUsers = parser.getAllowedUsers().get(ACLType.AM_VIEW_ACL);
+    Set<String> viewGroups = parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL);
+    if (viewUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+      conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, ACLManager.WILDCARD_ACL_VALUE);
     } else {
-      String userList = ACLManager.toCommaSeparatedString(usersWithViewACLs);
-      String groupList = ACLManager.toCommaSeparatedString(groupsWithViewACLs);
-      conf.set(TezConstants.TEZ_DAG_VIEW_ACLS,
-          userList + " " + groupList);
+      String userList = ACLManager.toCommaSeparatedString(viewUsers);
+      String groupList = ACLManager.toCommaSeparatedString(viewGroups);
+      conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, userList + " " + groupList);
     }
-    if (usersWithModifyACLs.contains(ACLManager.WILDCARD_ACL_VALUE)) {
-      conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, ACLManager.WILDCARD_ACL_VALUE);
+
+    Set<String> modifyUsers = parser.getAllowedUsers().get(ACLType.AM_MODIFY_ACL);
+    Set<String> modifyGroups = parser.getAllowedGroups().get(ACLType.AM_MODIFY_ACL);
+    if (modifyUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+      conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, ACLManager.WILDCARD_ACL_VALUE);
     } else {
-      String userList = ACLManager.toCommaSeparatedString(usersWithModifyACLs);
-      String groupList = ACLManager.toCommaSeparatedString(groupsWithModifyACLs);
-      conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, userList + " " + groupList);
+      String userList = ACLManager.toCommaSeparatedString(modifyUsers);
+      String groupList = ACLManager.toCommaSeparatedString(modifyGroups);
+      conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, userList + " " + groupList);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
index fc0f57c..92eea67 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
@@ -30,15 +30,21 @@ import org.apache.tez.common.security.HistoryACLPolicyException;
 
 /**
  * ACL Policy Manager
- * An instance of this implements any ACL related activity when starting a session or
- * submitting a DAG
+ * An instance of this implements any ACL related activity when starting a session or submitting a 
+ * DAG. It is used in the HistoryLoggingService to create domain ids and populate entities with
+ * domain id.
  */
 @Unstable
 @Private
 public interface HistoryACLPolicyManager extends Configurable {
 
   /**
-   * Take any necessary steps for setting up Session ACLs
+   * Take any necessary steps for setting up both Session ACLs and non session acls. This is called
+   * with the am configuration which contains the ACL information to be used to create a domain.
+   * If the method returns a value, then its assumed to be a valid domain and used as domainId.
+   * If the method returns null, acls are disabled at session level, i.e use default acls at session
+   * level.
+   * If the method throws an Exception, history logging is disabled for the entire session.
    * @param conf Configuration
    * @param applicationId Application ID for the session
    * @throws Exception
@@ -47,7 +53,7 @@ public interface HistoryACLPolicyManager extends Configurable {
       throws IOException, HistoryACLPolicyException;
 
   /**
-   * Take any necessary steps for setting up ACLs for an AM which is running in non-session mode
+   * Not used currently.
    * @param conf Configuration
    * @param applicationId Application ID for the AM
    * @param dagAccessControls ACLs defined for the DAG being submitted
@@ -57,16 +63,26 @@ public interface HistoryACLPolicyManager extends Configurable {
       DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException;
 
   /**
-   * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session
+   * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session. This is
+   * called with dag configuration.
+   * If the method returns a value, then it is assumed to be valid domain and is used as a domainId
+   * for all of the dag events.
+   * If the method returns null, it falls back to session level acls.
+   * If the method throws Exception: it disables history logging for the dag events.
    * @param conf Configuration
    * @param applicationId Application ID for the AM
    * @param dagAccessControls ACLs defined for the DAG being submitted
    * @throws Exception
    */
   public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId,
-      String dagName, DAGAccessControls dagAccessControls) throws IOException, HistoryACLPolicyException;
-
+      String dagName, DAGAccessControls dagAccessControls)
+          throws IOException, HistoryACLPolicyException;
 
+  /**
+   * Called with a timeline entity which has to be updated with a domain id.
+   * @param timelineEntity The timeline entity which will be published.
+   * @param domainId The domainId returned by one of the setup*ACL calls.
+   */
   public void updateTimelineEntityDomain(Object timelineEntity, String domainId);
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 65321a8..f15c1fb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -770,15 +769,15 @@ public class DAG {
                            Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
                            boolean tezLrsAsArchive) {
     return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
-        null, null, null);
+        null, null);
   }
 
   // create protobuf message describing DAG
   @Private
   public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
-      boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
-      ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) {
+      boolean tezLrsAsArchive, ServicePluginsDescriptor servicePluginsDescriptor,
+      JavaOptsChecker javaOptsChecker) {
     Deque<String> topologicalVertexStack = verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -1017,30 +1016,11 @@ public class DAG {
       dagBuilder.addEdge(edgeBuilder);
     }
 
-    ConfigurationProto.Builder confProtoBuilder =
-        ConfigurationProto.newBuilder();
     if (dagAccessControls != null) {
-      Configuration aclConf = new Configuration(false);
-      dagAccessControls.serializeToConfiguration(aclConf);
-      Iterator<Entry<String, String>> aclConfIter = aclConf.iterator();
-      while (aclConfIter.hasNext()) {
-        Entry<String, String> entry = aclConfIter.next();
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
-        confProtoBuilder.addConfKeyValues(kvp);
-      }
-    }
-    if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
-      for (Entry<String, String> entry : additionalConfigs.entrySet()) {
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
-        confProtoBuilder.addConfKeyValues(kvp);
-      }
+      dagBuilder.setAclInfo(DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls));
     }
+
+    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
     if (!this.dagConf.isEmpty()) {
       for (Entry<String, String> entry : this.dagConf.entrySet()) {
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 5733da8..cefe026 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -50,6 +50,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -57,6 +58,7 @@ import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -863,4 +865,27 @@ public class DagTypeConverters {
     return callerContext;
   }
 
+  public static ACLInfo convertDAGAccessControlsToProto(DAGAccessControls dagAccessControls) {
+    if (dagAccessControls == null) {
+      return null;
+    }
+    ACLInfo.Builder builder = ACLInfo.newBuilder();
+    builder.addAllUsersWithViewAccess(dagAccessControls.getUsersWithViewACLs());
+    builder.addAllUsersWithModifyAccess(dagAccessControls.getUsersWithModifyACLs());
+    builder.addAllGroupsWithViewAccess(dagAccessControls.getGroupsWithViewACLs());
+    builder.addAllGroupsWithModifyAccess(dagAccessControls.getGroupsWithModifyACLs());
+    return builder.build();
+  }
+
+  public static DAGAccessControls convertDAGAccessControlsFromProto(ACLInfo aclInfo) {
+    if (aclInfo == null) {
+      return null;
+    }
+    DAGAccessControls dagAccessControls = new DAGAccessControls();
+    dagAccessControls.setUsersWithViewACLs(aclInfo.getUsersWithViewAccessList());
+    dagAccessControls.setUsersWithModifyACLs(aclInfo.getUsersWithModifyAccessList());
+    dagAccessControls.setGroupsWithViewACLs(aclInfo.getGroupsWithViewAccessList());
+    dagAccessControls.setGroupsWithModifyACLs(aclInfo.getGroupsWithModifyAccessList());
+    return dagAccessControls;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index d016d60..c84094b 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -197,6 +197,13 @@ message CallerContextProto {
   optional string blob = 4;
 }
 
+message ACLInfo {
+  repeated string usersWithViewAccess = 1;
+  repeated string usersWithModifyAccess = 2;
+  repeated string groupsWithViewAccess = 3;
+  repeated string groupsWithModifyAccess = 4;
+}
+
 message DAGPlan {
   required string name = 1;
   repeated VertexPlan vertex = 2;
@@ -208,6 +215,7 @@ message DAGPlan {
   optional string dag_info = 8;
   optional VertexExecutionContextProto default_execution_context = 9;
   optional CallerContextProto caller_context = 10;
+  optional ACLInfo aclInfo = 11;
 }
 
 // DAG monitoring messages

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index d49ba48..51f36a3 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -241,12 +240,10 @@ public class TestTezClient {
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
     
     TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null);
-    HistoryACLPolicyManager mockAcl = mock(HistoryACLPolicyManager.class);
 
     ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
     .thenReturn(YarnApplicationState.RUNNING);
-    client.setUpHistoryAclManager(mockAcl);
     client.start();
     verify(client.mockYarnClient, times(1)).init((Configuration)any());
     verify(client.mockYarnClient, times(1)).start();
@@ -344,9 +341,8 @@ public class TestTezClient {
           (ShutdownSessionRequestProto) any());
     }
     verify(client.mockYarnClient, times(1)).stop();
-    verify(mockAcl, times(1)).close();
   }
-  
+
   @Test (timeout=5000)
   public void testPreWarm() throws Exception {
     TezClientForTest client = configureAndCreateTezClient();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 2c69d77..49aae20 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -133,7 +131,7 @@ public class TestTezClientUtils {
   /**
    *
    */
-  @Test (timeout=5000)
+  @Test (timeout=10000)
   public void validateSetTezJarLocalResourcesDefinedExistingDirectory() throws Exception {
     URL[] cp = ((URLClassLoader)ClassLoader.getSystemClassLoader()).getURLs();
     StringBuffer buffer = new StringBuffer();
@@ -314,7 +312,7 @@ public class TestTezClientUtils {
         appId, null, "dagname",
         amConf, m,
         credentials, false,
-        new TezApiVersionInfo(), null, null, null);
+        new TezApiVersionInfo(), null, null);
     assertEquals(testpriority, appcontext.getPriority().getPriority());
   }
 
@@ -366,7 +364,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class), null, null);
+            null, null);
 
     ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
     Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
@@ -399,7 +397,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class), null, null);
+            null, null);
 
     List<String> expectedCommands = new LinkedList<String>();
     expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -439,7 +437,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class), null, null);
+            null, null);
 
     List<String> expectedCommands = new LinkedList<String>();
     expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -626,7 +624,7 @@ public class TestTezClientUtils {
     expected.put("property1", val1);
     expected.put("property2", expVal2);
 
-    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, null);
+    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null);
 
     for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
       String v = expected.remove(kvPair.getKey());
@@ -730,7 +728,7 @@ public class TestTezClientUtils {
       srcConf.set(entry.getKey(), entry.getValue());
     }
 
-    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null, null);
+    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null);
 
     for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
       String val = confMap.remove(kvPair.getKey());
@@ -792,7 +790,7 @@ public class TestTezClientUtils {
     Configuration conf = new Configuration(false);
     ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
 
-    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null,
+    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf,
         servicePluginsDescriptor);
 
     assertTrue(confProto.hasAmPluginDescriptor());

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
index a88e801..fc9c24e 100644
--- a/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
+++ b/tez-api/src/test/java/org/apache/tez/common/security/TestACLManager.java
@@ -20,18 +20,16 @@ package org.apache.tez.common.security;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
 
 public class TestACLManager {
 
@@ -64,7 +62,8 @@ public class TestACLManager {
     Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_VIEW_ACL));
     Assert.assertTrue(aclManager.checkAccess(user, ACLType.AM_MODIFY_ACL));
 
-    ACLManager dagAclManager = new ACLManager(aclManager, dagUser.getShortUserName(), new Configuration(false));
+    ACLManager dagAclManager = new ACLManager(aclManager, dagUser.getShortUserName(),
+        ACLInfo.getDefaultInstance());
     user = dagUser;
     Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_VIEW_ACL));
     Assert.assertFalse(dagAclManager.checkAccess(user, ACLType.AM_MODIFY_ACL));
@@ -256,17 +255,20 @@ public class TestACLManager {
     conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyACLs);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, yarnAdminACLs);
 
-    // DAG View ACLs: user1, user4, grp3, grp4.
-    String dagViewACLs = "user6,   grp5  ";
-    // DAG Modify ACLs: user3, grp6, grp7
-    String dagModifyACLs = "user6,user5 ";
-    conf.set(TezConstants.TEZ_DAG_VIEW_ACLS, dagViewACLs);
-    conf.set(TezConstants.TEZ_DAG_MODIFY_ACLS, dagModifyACLs);
+    ACLInfo.Builder builder = ACLInfo.newBuilder();
+    // DAG View ACLs: user6, grp5
+    builder.addUsersWithViewAccess("user6");
+    builder.addGroupsWithViewAccess("grp5");
+
+    // DAG Modify ACLs: user6,user5
+    builder.addUsersWithModifyAccess("user6");
+    builder.addUsersWithModifyAccess("user7");
 
     UserGroupInformation dagUser = UserGroupInformation.createUserForTesting("dagUser", noGroups);
 
     ACLManager amAclManager = new ACLManager(currentUser.getShortUserName(), conf);
-    ACLManager aclManager = new ACLManager(amAclManager, dagUser.getShortUserName(), conf);
+    ACLManager aclManager = new ACLManager(amAclManager, dagUser.getShortUserName(),
+        builder.build());
 
     Assert.assertTrue(aclManager.checkAMViewAccess(currentUser));
     Assert.assertFalse(aclManager.checkAMViewAccess(dagUser));

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
index 6afc83b..4335a20 100644
--- a/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
+++ b/tez-api/src/test/java/org/apache/tez/common/security/TestDAGAccessControls.java
@@ -18,76 +18,14 @@
 
 package org.apache.tez.common.security;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestDAGAccessControls {
-
-  @Test(timeout = 5000)
-  public void testBasicSerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-    dagAccessControls.setUsersWithViewACLs(Arrays.asList("u1"))
-        .setUsersWithModifyACLs(Arrays.asList("u2"))
-        .setGroupsWithViewACLs(Arrays.asList("g1"))
-        .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
-    Assert.assertEquals("u1 g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals("u2 g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
-
-  @Test(timeout = 5000)
-  public void testWildCardSerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-    dagAccessControls.setUsersWithViewACLs(Arrays.asList("*"))
-        .setUsersWithModifyACLs(Arrays.asList("*"))
-        .setGroupsWithViewACLs(Arrays.asList("g1"))
-        .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
-    Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals("*", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
-
-  @Test(timeout = 5000)
-  public void testGroupsOnlySerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-    dagAccessControls.setGroupsWithViewACLs(Arrays.asList("g1"))
-        .setGroupsWithModifyACLs(Arrays.asList("g2"));
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
+import com.google.common.collect.Sets;
 
-    Assert.assertEquals(" g1", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals(" g2", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
-
-  @Test(timeout = 5000)
-  public void testEmptySerializeToConf()  {
-    DAGAccessControls dagAccessControls = new DAGAccessControls();
-
-    Configuration conf = new Configuration(false);
-    dagAccessControls.serializeToConfiguration(conf);
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertNotNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-
-    Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
-    Assert.assertEquals(" ", conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
-  }
+public class TestDAGAccessControls {
 
   @Test(timeout = 5000)
   public void testStringBasedConstructor() {
@@ -102,8 +40,107 @@ public class TestDAGAccessControls {
     Assert.assertTrue(dagAccessControls.getUsersWithModifyACLs().contains("u2"));
     Assert.assertTrue(dagAccessControls.getGroupsWithViewACLs().contains("g1"));
     Assert.assertTrue(dagAccessControls.getGroupsWithModifyACLs().contains("g2"));
+  }
 
+
+  @Test(timeout=5000)
+  public void testMergeIntoAmAcls() {
+    DAGAccessControls dagAccessControls = new DAGAccessControls("u1 g1", "u2 g2");
+    Configuration conf = new Configuration(false);
+
+    // default conf should have ACLs copied over.
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // both have unique users merged should have all
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u1 g1");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u2 g2");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u1 g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u2 g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // both have unique users merged should have all
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u3,u1 g3,g1", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u4,u2 g4,g2", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // one of the user is *, merged is always *
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // only * in the config, merged is *
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // DAG access with *, all operation yeild *
+    dagAccessControls = new DAGAccessControls("*", "*");
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    // DAG access is empty, conf should be same.
+    dagAccessControls = new DAGAccessControls("", "");
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("u3 g3", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("u4 g4", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*,u3 g3");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*,u4 g4");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
+
+    conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, "*");
+    conf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, "*");
+    dagAccessControls.mergeIntoAmAcls(conf);
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_VIEW_ACLS));
+    assertACLS("*", conf.get(TezConfiguration.TEZ_AM_MODIFY_ACLS));
   }
 
+  public void assertACLS(String expected, String obtained) {
+    if (expected.equals(obtained)) {
+      return;
+    }
 
+    String [] parts1 = expected.split(" ");
+    String [] parts2 = obtained.split(" ");
+
+    Assert.assertEquals(parts1.length, parts2.length);
+
+    Assert.assertEquals(
+        Sets.newHashSet(parts1[0].split(",")), Sets.newHashSet(parts2[0].split(",")));
+
+    if (parts1.length < 2) {
+      return;
+    }
+    Assert.assertEquals(
+        Sets.newHashSet(parts1[1].split(",")), Sets.newHashSet(parts2[1].split(",")));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 005c027..8e1011f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -370,14 +370,14 @@ public class TestDAGPlan {
     dag.addVertex(v1);
 
     // Should succeed. Default context is containers.
-    dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+    dag.createDag(new TezConfiguration(false), null, null, null, true,
         servicePluginsDescriptor, null);
 
 
     // Set execute in AM should fail
     v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -386,13 +386,13 @@ public class TestDAGPlan {
 
     // Valid context
     v1.setExecutionContext(validExecContext);
-    dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+    dag.createDag(new TezConfiguration(false), null, null, null, true,
         servicePluginsDescriptor, null);
 
     // Invalid task scheduler
     v1.setExecutionContext(invalidExecContext1);
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -404,7 +404,7 @@ public class TestDAGPlan {
     // Invalid ContainerLauncher
     v1.setExecutionContext(invalidExecContext2);
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -416,7 +416,7 @@ public class TestDAGPlan {
     // Invalid task comm
     v1.setExecutionContext(invalidExecContext3);
     try {
-      dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+      dag.createDag(new TezConfiguration(false), null, null, null, true,
           servicePluginsDescriptor, null);
       fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
     } catch (IllegalStateException e) {
@@ -444,7 +444,8 @@ public class TestDAGPlan {
             new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)},
             new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)});
 
-    Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context);
+    Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1))
+        .setExecutionContext(v1Context);
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
         .addTaskLocalFiles(new HashMap<String, LocalResource>());
@@ -462,7 +463,7 @@ public class TestDAGPlan {
     dag.addVertex(v1).addVertex(v2).addEdge(edge);
     dag.setExecutionContext(defaultExecutionContext);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, null,
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true,
         servicePluginsDescriptor, null);
 
     assertEquals(2, dagProto.getVertexCount());
@@ -502,8 +503,7 @@ public class TestDAGPlan {
     TezConfiguration conf = new TezConfiguration(false);
     conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, "  -XX:+UseParallelGC ");
     try {
-      DAGPlan dagProto = dag.createDag(conf, null, null, null, true, null, null,
-          new JavaOptsChecker());
+      dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker());
       fail("Expected dag creation to fail for invalid java opts");
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains("Invalid/conflicting GC options"));
@@ -511,12 +511,11 @@ public class TestDAGPlan {
 
     // Should not fail as java opts valid
     conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, "  -XX:-UseParallelGC ");
-    DAGPlan dagProto1 = dag.createDag(conf, null, null, null, true, null, null,
-        new JavaOptsChecker());
+    dag.createDag(conf, null, null, null, true, null, new JavaOptsChecker());
 
     // Should not fail as no checker enabled
     conf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, "  -XX:+UseParallelGC ");
-    DAGPlan dagProto2 = dag.createDag(conf, null, null, null, true, null, null, null);
+    dag.createDag(conf, null, null, null, true, null, null);
 
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index c566b1a..794a597 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.api;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,9 +35,8 @@ import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.junit.Assert;
@@ -1044,21 +1044,11 @@ public class TestDAGVerify {
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
 
-    ConfigurationProto confProto = dagPlan.getDagConf();
-    boolean foundViewAcls = false;
-    boolean foundModifyAcls = false;
-
-    for (PlanKeyValuePair pair : confProto.getConfKeyValuesList()) {
-      if (pair.getKey().equals(TezConstants.TEZ_DAG_VIEW_ACLS)) {
-        foundViewAcls = true;
-        Assert.assertEquals("u1 g1", pair.getValue());
-      } else if (pair.getKey().equals(TezConstants.TEZ_DAG_MODIFY_ACLS)) {
-        foundModifyAcls = true;
-        Assert.assertEquals("*", pair.getValue());
-      }
-    }
-    Assert.assertTrue(foundViewAcls);
-    Assert.assertTrue(foundModifyAcls);
+    ACLInfo aclInfo = dagPlan.getAclInfo();
+    Assert.assertEquals(Collections.singletonList("u1"), aclInfo.getUsersWithViewAccessList());
+    Assert.assertEquals(Collections.singletonList("g1"), aclInfo.getGroupsWithViewAccessList());
+    Assert.assertEquals(Collections.singletonList("*"), aclInfo.getUsersWithModifyAccessList());
+    Assert.assertEquals(Collections.singletonList("g2"), aclInfo.getGroupsWithModifyAccessList());
   }
 
   // v1 has input initializer

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 6f795fc..dc04f2d 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos.ACLInfo;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -44,6 +46,8 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class TestDagTypeConverters {
 
   @Test(timeout = 5000)
@@ -208,6 +212,40 @@ public class TestDagTypeConverters {
     verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true);
   }
 
+  @Test
+  public void testAclConversions() {
+    DAGAccessControls dagAccessControls = new DAGAccessControls("u1,u2 g1,g2", "u3,u4 g3,g4");
+    ACLInfo aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+
+    dagAccessControls = new DAGAccessControls("u1 ", "u2 ");
+    aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+
+    dagAccessControls = new DAGAccessControls(" g1", " g3,g4");
+    aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+
+    dagAccessControls = new DAGAccessControls("*", "*");
+    aclInfo = DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls);
+    assertSame(dagAccessControls, aclInfo);
+    assertSame(DagTypeConverters.convertDAGAccessControlsFromProto(aclInfo), aclInfo);
+  }
+
+  private void assertSame(DAGAccessControls dagAccessControls, ACLInfo aclInfo) {
+    assertEquals(dagAccessControls.getUsersWithViewACLs(),
+        Sets.newHashSet(aclInfo.getUsersWithViewAccessList()));
+    assertEquals(dagAccessControls.getUsersWithModifyACLs(),
+        Sets.newHashSet(aclInfo.getUsersWithModifyAccessList()));
+    assertEquals(dagAccessControls.getGroupsWithViewACLs(),
+        Sets.newHashSet(aclInfo.getGroupsWithViewAccessList()));
+    assertEquals(dagAccessControls.getGroupsWithModifyACLs(),
+        Sets.newHashSet(aclInfo.getGroupsWithModifyAccessList()));
+  }
+
   private void verifyPlugins(List<TezNamedEntityDescriptorProto> entities, int expectedCount,
                              String baseString, boolean hasPayload) {
     assertEquals(expectedCount, entities.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index fd6d446..481353b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -542,7 +542,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
 
     this.aclManager = new ACLManager(appContext.getAMACLManager(), dagUGI.getShortUserName(),
-        this.dagConf);
+        this.jobPlan.getAclInfo());
     // this is only for recovery in case it does not call the init transition
     this.startDAGCpuTime = appContext.getCumulativeCPUTime();
     this.startDAGGCTime = appContext.getCumulativeGCTime();

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
index 91ffe7b..45e24ce 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
@@ -154,7 +154,6 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -164,18 +163,13 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
       domainId = DOMAIN_ID_PREFIX + applicationId.toString();
       createTimelineDomain(domainId, tezConf, dagAccessControls);
       LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId);
-      return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
     }
+    return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
   }
 
   private Map<String, String> createDAGDomain(Configuration tezConf,
       ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
       throws IOException, HistoryACLPolicyException {
-    if (dagAccessControls == null) {
-      // No DAG specific ACLs
-      return null;
-    }
-
     String domainId =
         tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
     if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
@@ -193,7 +187,6 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
     if (domainId != null) {
       // do nothing
       LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
-      return null;
     } else {
       if (!autoCreateDomain) {
         // Error - Cannot fallback to default as that leaves ACLs open
@@ -201,14 +194,17 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
             + " Domains is disabled");
       }
 
+      // Create a domain only if dagAccessControls has been specified.
+      if (dagAccessControls == null) {
+        return null;
+      }
       domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
       createTimelineDomain(domainId, tezConf, dagAccessControls);
       LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
-      return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
     }
+    return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
   }
 
-
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tez/blob/a23de498/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index 2c976f5..6b3ebd7 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.ReflectionUtils;
@@ -54,6 +53,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -73,7 +73,6 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 
-import org.mockito.Matchers;
 
 public class TestATSHistoryWithACLs {
 
@@ -307,170 +306,6 @@ public class TestATSHistoryWithACLs {
   }
 
   /**
-   * test failure of domain creation during dag submittion in session mode
-   * only affect logging for that dag not following submitted dag 
-   * @throws Exception
-   */
-  @Test (timeout=50000)
-  public void testMultipleDagSession() throws Exception {
-    TezClient tezSession = null;
-    String viewAcls = "nobody nobody_group";
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = DAG.create("TezSleepProcessor");
-    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-            Resource.newInstance(256, 1));
-    dag.addVertex(vertex);
-    DAGAccessControls accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
-    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
-    dag.setAccessControls(accessControls);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
-    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        ATSHistoryLoggingService.class.getName());
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-        .nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-    tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-    tezSession.start();
-
-    //////submit first dag which fails in dag creation//////
-    ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
-              atsHistoryACLManagerClassName);
-    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
-    doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
-
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
-    
-    myAclPolicyManager.timelineClient = null;
-    myAclPolicyManager.setConf(tezConf);
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
-
-    //////submit second dag which succeeds in dag creation//////
-    DAG dag2 = DAG.create("TezSleepProcessor2");
-    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-          SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-       Resource.newInstance(256, 1));
-    dag2.addVertex(vertex);
-    accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
-    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
-    dag2.setAccessControls(accessControls);
-    dagClient = tezSession.submitDAG(dag2);
-    dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-                + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    Assert.assertNull(dagLogging);
-    myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient);
-    tezSession.stop();
-    verify(myAclPolicyManager.timelineClient, times(1)).stop();
-  }
-  
-/**
- * test failure of domain creation during dag submittion in nonsession mode
- * only affect logging for that dag not following submitted dag 
- * @throws Exception
- */
-  @Test (timeout=50000)
-  public void testMultipleDagNonSession() throws Exception {
-    TezClient tezClient = null;
-    String viewAcls = "nobody nobody_group";
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = DAG.create("TezSleepProcessor");
-    Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-            Resource.newInstance(256, 1));
-    dag.addVertex(vertex);
-    DAGAccessControls accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
-    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
-    dag.setAccessControls(accessControls);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
-    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        ATSHistoryLoggingService.class.getName());
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-        .nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-    tezClient = TezClient.create("TezSleepProcessor", tezConf, false);
-    tezClient.start();
-
-    //////submit first dag which fails in dag creation//////
-    ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
-              atsHistoryACLManagerClassName);
-    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
-    doThrow(new IOException("Fail to Put Domain")).when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
-    tezClient.setUpHistoryAclManager(myAclPolicyManager);
-
-    DAGClient dagClient = tezClient.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
-    
-    myAclPolicyManager.timelineClient = null;
-    myAclPolicyManager.setConf(tezConf);
-    tezClient.setUpHistoryAclManager(myAclPolicyManager);
-
-    //////submit second dag which succeeds in dag creation//////
-    DAG dag2 = DAG.create("TezSleepProcessor2");
-    vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-           SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-        Resource.newInstance(256, 1));
-    dag2.addVertex(vertex);
-    accessControls = new DAGAccessControls();
-    accessControls.setUsersWithViewACLs(Collections.singleton("nobody3"));
-    accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group3"));
-    dag2.setAccessControls(accessControls);
-    dagClient = tezClient.submitDAG(dag2);
-    dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-                   + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    Assert.assertNull(dagLogging);
-    myAclPolicyManager.timelineClient = spy(myAclPolicyManager.timelineClient);
-    tezClient.stop();
-    verify(myAclPolicyManager.timelineClient, times(1)).stop();
-
-  }
-  /**
    * Test Disable Logging for all dags in a session 
    * due to failure to create domain in session start
    * @throws Exception
@@ -501,19 +336,8 @@ public class TestATSHistoryWithACLs {
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
 
     tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-    ATSHistoryACLPolicyManager myAclPolicyManager = ReflectionUtils.createClazzInstance(
-            atsHistoryACLManagerClassName);
-    myAclPolicyManager.timelineClient = mock(TimelineClient.class);
-
-    doThrow(new IOException("Fail to Put Domain")).
-        when(myAclPolicyManager.timelineClient).putDomain(Matchers.<TimelineDomain>anyVararg());
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
     tezSession.start();
 
-    ///substitute back mocked timelineClient with a normal one
-    myAclPolicyManager.timelineClient = null;
-    myAclPolicyManager.setConf(tezConf);
-    tezSession.setUpHistoryAclManager(myAclPolicyManager);
     //////submit first dag //////
     DAGClient dagClient = tezSession.submitDAG(dag);
     DAGStatus dagStatus = dagClient.getDAGStatus(null);
@@ -524,9 +348,7 @@ public class TestATSHistoryWithACLs {
       dagStatus = dagClient.getDAGStatus(null);
     }
     assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    String dagLogging = dag.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
- 
+
     //////submit second dag//////
     DAG dag2 = DAG.create("TezSleepProcessor2");
     vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
@@ -545,10 +367,9 @@ public class TestATSHistoryWithACLs {
       Thread.sleep(500l);
       dagStatus = dagClient.getDAGStatus(null);
     }
-    dagLogging = dag2.getDagConf().get(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED);
-    assertEquals(dagLogging, "false");
     tezSession.stop();
   }
+
   /**
    * use mini cluster to verify data do not push to ats when the daglogging flag
    * in dagsubmittedevent is set off
@@ -559,6 +380,9 @@ public class TestATSHistoryWithACLs {
     ATSHistoryLoggingService historyLoggingService;
     historyLoggingService =
         ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1));
+    historyLoggingService.setAppContext(appContext);
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
     String viewAcls = "nobody nobody_group";
     tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
@@ -568,8 +392,8 @@ public class TestATSHistoryWithACLs {
         .nextInt(100000))));
     remoteFs.mkdirs(remoteStagingDir);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-    historyLoggingService.serviceInit(tezConf);
-    historyLoggingService.serviceStart();
+    historyLoggingService.init(tezConf);
+    historyLoggingService.start();
     ApplicationId appId = ApplicationId.newInstance(100l, 1);
     TezDAGID tezDAGID = TezDAGID.getInstance(
                         appId, 100);
@@ -601,6 +425,9 @@ public class TestATSHistoryWithACLs {
     ATSHistoryLoggingService historyLoggingService;
     historyLoggingService =
             ReflectionUtils.createClazzInstance(ATSHistoryLoggingService.class.getName());
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getApplicationID()).thenReturn(ApplicationId.newInstance(0, 1));
+    historyLoggingService.setAppContext(appContext);
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
     String viewAcls = "nobody nobody_group";
     tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
@@ -610,8 +437,8 @@ public class TestATSHistoryWithACLs {
         .nextInt(100000))));
     remoteFs.mkdirs(remoteStagingDir);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-    historyLoggingService.serviceInit(tezConf);
-    historyLoggingService.serviceStart();
+    historyLoggingService.init(tezConf);
+    historyLoggingService.start();
     ApplicationId appId = ApplicationId.newInstance(100l, 1);
     TezDAGID tezDAGID = TezDAGID.getInstance(
                         appId, 11);
@@ -636,7 +463,7 @@ public class TestATSHistoryWithACLs {
     assertEquals(entity.getEntityType(), "TEZ_DAG_ID");
     assertEquals(entity.getEvents().get(0).getEventType(), HistoryEventType.DAG_SUBMITTED.toString());
   }
-  
+
   private static final String atsHistoryACLManagerClassName =
       "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
   @Test (timeout=50000)
@@ -676,5 +503,4 @@ public class TestATSHistoryWithACLs {
     }
   }
 
-
 }