You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/12/12 01:36:07 UTC
[2/2] tez git commit: TEZ-1696. Make Tez use the domain-based
timeline ACLs. (hitesh)
TEZ-1696. Make Tez use the domain-based timeline ACLs. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bdf0307e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bdf0307e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bdf0307e
Branch: refs/heads/master
Commit: bdf0307e75cb366f5b96b709e0773c51fc77baa5
Parents: c82afb2
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Dec 11 16:35:20 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Dec 11 16:35:20 2014 -0800
----------------------------------------------------------------------
BUILDING.txt | 19 +-
CHANGES.txt | 1 +
pom.xml | 13 +-
.../java/org/apache/tez/client/TezClient.java | 46 ++-
.../org/apache/tez/client/TezClientUtils.java | 49 ++-
.../common/security/ACLConfigurationParser.java | 21 ++
.../apache/tez/common/security/ACLManager.java | 4 +-
.../org/apache/tez/common/security/ACLType.java | 2 +-
.../security/HistoryACLPolicyManager.java | 72 +++++
.../main/java/org/apache/tez/dag/api/DAG.java | 24 +-
.../apache/tez/dag/api/TezConfiguration.java | 18 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 2 +-
.../dag/history/events/DAGSubmittedEvent.java | 10 +-
.../TestHistoryEventsProtoConversion.java | 2 +-
.../impl/TestHistoryEventJsonConversion.java | 2 +-
tez-dist/pom.xml | 15 +-
tez-plugins/pom.xml | 11 +-
.../tez-yarn-timeline-history-with-acls/pom.xml | 139 ++++++++
.../ats/acls/ATSHistoryACLPolicyManager.java | 234 ++++++++++++++
.../java/org/apache/tez/dag/history/logging/ats | 1 +
.../ats/acls/TestATSHistoryWithACLs.java | 324 +++++++++++++++++++
.../src/test/java/org/apache/tez/tests | 1 +
tez-plugins/tez-yarn-timeline-history/pom.xml | 63 ++++
.../logging/ats/ATSHistoryLoggingService.java | 55 +++-
.../ats/TestATSHistoryLoggingService.java | 2 +
.../ats/TestATSHistoryWithMiniCluster.java | 242 ++++++++++++++
.../ats/TestHistoryEventTimelineConversion.java | 4 +-
.../tez/tests/MiniTezClusterWithTimeline.java | 253 +++++++++++++++
.../examples/TestOrderedWordCount.java | 28 ++
.../org/apache/tez/test/MiniTezCluster.java | 3 +
30 files changed, 1626 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 0d7df9b..e27bedf 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -65,20 +65,26 @@ Building against a specific version of hadoop:
Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher.
-By default, it can be compiled against hadoop versions 2.4.0 and higher by just
+By default, it can be compiled against hadoop versions 2.6.0 and higher by just
specifying the hadoop.version. For example, to build tez against hadoop 3.0.0-SNAPSHOT
$ mvn package -Dhadoop.version=3.0.0-SNAPSHOT
-However, to compile against hadoop versions lower than 2.4.0, the hadoop24 profile needs
-to be disabled
-
- $ mvn package -Dhadoop.version=2.2.0 -P\!hadoop24
-
To skip Tests and java docs
$ mvn package -Dhadoop.version=3.0.0-SNAPSHOT -DskipTests -Dmaven.javadoc.skip=true
+However, to build against hadoop versions lower than 2.6.0, you will need to do the
+following:
+
+For Hadoop version X where 2.4.0 <= X < 2.6.0
+
+ $ mvn package -Dhadoop.version=${X} -Phadoop24 -P\!hadoop26
+
+For Hadoop version X where X < 2.4.0
+
+ $ mvn package -Dhadoop.version=${X} -P\!hadoop24 -P\!hadoop26
+
----------------------------------------------------------------------------------
Protocol Buffer compiler:
@@ -94,7 +100,6 @@ You can also specify the path to protoc while building using -Dprotoc.path
$ mvn package -DskipTests -Dprotoc.path=/usr/local/bin/protoc
-
----------------------------------------------------------------------------------
Building the docs:
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f19ba0..52507f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1696. Make Tez use the domain-based timeline ACLs.
TEZ-1835. TestFaultTolerance#testRandomFailingTasks is timing out
TEZ-1832. TestSecureShuffle fails with NoClassDefFoundError: org/bouncycastle/x509/X509V1CertificateGenerator
TEZ-1672. Update jetty to use stable 7.x version - 7.6.16.v20140903.
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8d1e037..fa50b17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<properties>
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<clover.license>${user.home}/clover.license</clover.license>
- <hadoop.version>2.4.0</hadoop.version>
+ <hadoop.version>2.6.0</hadoop.version>
<jetty.version>7.6.16.v20140903</jetty.version>
<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -137,6 +137,12 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
<artifactId>tez-dag</artifactId>
<version>${project.version}</version>
</dependency>
@@ -527,6 +533,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>1.9</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index fc70b48..ab6e12d 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
@@ -107,9 +110,14 @@ public class TezClient {
new JobTokenSecretManager();
private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
private TezApiVersionInfo apiVersionInfo;
+ private HistoryACLPolicyManager historyACLPolicyManager;
private int preWarmDAGCounter = 0;
+ private static final String atsHistoryLoggingServiceClassName =
+ "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
+ private static final String atsHistoryACLManagerClassName =
+ "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
private TezClient(String name, TezConfiguration tezConf) {
this(name, tezConf, tezConf.getBoolean(
@@ -139,6 +147,7 @@ public class TezClient {
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
this.apiVersionInfo = new TezApiVersionInfo();
+
LOG.info("Tez Client Version: " + apiVersionInfo.toString());
}
@@ -288,7 +297,27 @@ public class TezClient {
frameworkClient = createFrameworkClient();
frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration());
- frameworkClient.start();
+ frameworkClient.start();
+
+ if (this.amConfig.getTezConfiguration().get(
+ TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
+ .equals(atsHistoryLoggingServiceClassName)) {
+ LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+ try {
+ historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+ atsHistoryACLManagerClassName);
+ historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
+ } catch (TezUncheckedException e) {
+ LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+ + ". ACLs cannot be enforced correctly for history data in Timeline", e);
+ if (!amConfig.getTezConfiguration().getBoolean(
+ TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
+ TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
+ throw e;
+ }
+ historyACLPolicyManager = null;
+ }
+ }
if (isSession) {
LOG.info("Session mode. Starting session.");
@@ -314,7 +343,8 @@ public class TezClient {
TezClientUtils.createApplicationSubmissionContext(
sessionAppId,
null, clientName, amConfig,
- tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo);
+ tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
+ historyACLPolicyManager);
// Set Tez Sessions to not retry on AM crashes if recovery is disabled
if (!amConfig.getTezConfiguration().getBoolean(
@@ -373,10 +403,16 @@ public class TezClient {
+ lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
}
}
-
+
+ Map<String, String> aclConfigs = null;
+ if (historyACLPolicyManager != null) {
+ aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
+ amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls());
+ }
+
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
- usingTezArchiveDeploy, sessionCredentials);
+ usingTezArchiveDeploy, sessionCredentials, aclConfigs);
SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
requestBuilder.setDAGPlan(dagPlan).build();
@@ -690,7 +726,7 @@ public class TezClient {
ApplicationSubmissionContext appContext = TezClientUtils
.createApplicationSubmissionContext(
appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
- usingTezArchiveDeploy, apiVersionInfo);
+ usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager);
LOG.info("Submitting DAG to YARN"
+ ", applicationId=" + appId
+ ", dagName=" + dag.getName());
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index de4bdd0..f5f8548 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -78,6 +78,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezYARNUtils;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
@@ -398,6 +399,7 @@ public class TezClientUtils {
* @param amConfig AM Configuration
* @param tezJarResources Resources to be used by the AM
* @param sessionCreds the credential object which will be populated with session specific
+ * @param historyACLPolicyManager
* @return an ApplicationSubmissionContext to launch a Tez AM
* @throws IOException
* @throws YarnException
@@ -406,7 +408,8 @@ public class TezClientUtils {
ApplicationId appId, DAG dag, String amName,
AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
Credentials sessionCreds, boolean tezLrsAsArchive,
- TezApiVersionInfo apiVersionInfo) throws IOException, YarnException{
+ TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager)
+ throws IOException, YarnException {
Preconditions.checkNotNull(sessionCreds);
TezConfiguration conf = amConfig.getTezConfiguration();
@@ -512,8 +515,24 @@ public class TezClientUtils {
}
amLocalResources.putAll(tezJarResources);
+ // Setup Session ACLs and update conf as needed
+ Map<String, String> aclConfigs = null;
+ if (historyACLPolicyManager != null) {
+ if (dag == null) {
+ aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
+ appId);
+ } else {
+ // Non-session mode
+ // As only a single DAG is support, we should combine AM and DAG ACLs under the same
+ // acl management layer
+ aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
+ appId, dag.getDagAccessControls());
+ }
+ }
+
// emit conf as PB file
- ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration());
+ ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(),
+ aclConfigs);
FSDataOutputStream amConfPBOutBinaryStream = null;
try {
@@ -635,9 +654,17 @@ public class TezClientUtils {
static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
Credentials credentials) throws IOException {
- Credentials dagCredentials = setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
+ return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials,
+ null);
+ }
+
+ static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
+ Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
+ Credentials credentials, Map<String, String> additionalDAGConfigs) throws IOException {
+ Credentials dagCredentials = setupDAGCredentials(dag, credentials,
+ amConfig.getTezConfiguration());
return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
- amConfig.getBinaryConfLR(), tezLrsAsArchive);
+ amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs);
}
static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -699,6 +726,11 @@ public class TezClientUtils {
}
static ConfigurationProto createFinalConfProtoForApp(Configuration amConf) {
+ return createFinalConfProtoForApp(amConf, null);
+ }
+
+ static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
+ Map<String, String> additionalConfigs) {
assert amConf != null;
ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
for (Entry<String, String> entry : amConf) {
@@ -707,9 +739,18 @@ public class TezClientUtils {
kvp.setValue(entry.getValue());
builder.addConfKeyValues(kvp);
}
+ if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
+ for (Entry<String, String> entry : additionalConfigs.entrySet()) {
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
+ builder.addConfKeyValues(kvp);
+ }
+ }
return builder.build();
}
+
/**
* Helper function to create a YARN LocalResource
* @param fs FileSystem object
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/common/security/ACLConfigurationParser.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/ACLConfigurationParser.java b/tez-api/src/main/java/org/apache/tez/common/security/ACLConfigurationParser.java
index ff35af8..e6c4101 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/ACLConfigurationParser.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLConfigurationParser.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
@@ -128,4 +129,24 @@ public class ACLConfigurationParser {
return Collections.unmodifiableMap(allowedGroups);
}
+ public void addAllowedUsers(Map<ACLType, Set<String>> additionalAllowedUsers) {
+ for (Entry<ACLType, Set<String>> entry : additionalAllowedUsers.entrySet()) {
+ if (allowedUsers.containsKey(entry.getKey())) {
+ allowedUsers.get(entry.getKey()).addAll(entry.getValue());
+ } else {
+ allowedUsers.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public void addAllowedGroups(Map<ACLType, Set<String>> additionalAllowedGroups) {
+ for (Entry<ACLType, Set<String>> entry : additionalAllowedGroups.entrySet()) {
+ if (allowedGroups.containsKey(entry.getKey())) {
+ allowedGroups.get(entry.getKey()).addAll(entry.getValue());
+ } else {
+ allowedGroups.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
index f83678b..c6a8f26 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
@@ -44,7 +44,7 @@ import com.google.common.annotations.VisibleForTesting;
public class ACLManager {
private static final Log LOG = LogFactory.getLog(ACLManager.class);
- static final String WILDCARD_ACL_VALUE = "*";
+ public static final String WILDCARD_ACL_VALUE = "*";
private final String dagUser;
private final String amUser;
@@ -218,7 +218,7 @@ public class ACLManager {
return acls;
}
- static String toCommaSeparatedString(Collection<String> collection) {
+ public static String toCommaSeparatedString(Collection<String> collection) {
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String s : collection) {
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/common/security/ACLType.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/ACLType.java b/tez-api/src/main/java/org/apache/tez/common/security/ACLType.java
index 854e928..fd00f22 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/ACLType.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/ACLType.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
* ACL Types
*/
@Private
-enum ACLType {
+public enum ACLType {
/** View permissions on the Application Master */
AM_VIEW_ACL,
/** Modify permissions on the Application Master */
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
new file mode 100644
index 0000000..a3b62ec
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.security;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * ACL Policy Manager
+ * An instance of this implements any ACL related activity when starting a session or
+ * submitting a DAG
+ */
+@Unstable
+@Private
+public interface HistoryACLPolicyManager extends Configurable {
+
+ /**
+ * Take any necessary steps for setting up Session ACLs
+ * @param conf Configuration
+ * @param applicationId Application ID for the session
+ * @throws Exception
+ */
+ public Map<String, String> setupSessionACLs(Configuration conf, ApplicationId applicationId)
+ throws IOException;
+
+ /**
+ * Take any necessary steps for setting up ACLs for an AM which is running in non-session mode
+ * @param conf Configuration
+ * @param applicationId Application ID for the AM
+ * @param dagAccessControls ACLs defined for the DAG being submitted
+ * @throws Exception
+ */
+ public Map<String, String> setupNonSessionACLs(Configuration conf, ApplicationId applicationId,
+ DAGAccessControls dagAccessControls) throws IOException;
+
+ /**
+ * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session
+ * @param conf Configuration
+ * @param applicationId Application ID for the AM
+ * @param dagAccessControls ACLs defined for the DAG being submitted
+ * @throws Exception
+ */
+ public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId,
+ String dagName, DAGAccessControls dagAccessControls) throws IOException;
+
+
+ public void updateTimelineEntityDomain(Object timelineEntity, String domainId);
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 91b468d..b4fdbd2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -84,6 +84,7 @@ public class DAG {
Credentials credentials = new Credentials();
Set<VertexGroup> vertexGroups = Sets.newHashSet();
Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
+
private DAGAccessControls dagAccessControls;
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
String dagInfo;
@@ -198,6 +199,11 @@ public class DAG {
return this;
}
+ @Private
+ public synchronized DAGAccessControls getDagAccessControls() {
+ return dagAccessControls;
+ }
+
/**
* One of the methods that can be used to provide information about required
* Credentials when running on a secure cluster. A combination of this and
@@ -606,10 +612,18 @@ public class DAG {
}
// create protobuf message describing DAG
+ public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+ Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
+ boolean tezLrsAsArchive) {
+ return createDag(dagConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
+ null);
+ }
+
+ // create protobuf message describing DAG
@Private
public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
- boolean tezLrsAsArchive) {
+ boolean tezLrsAsArchive, Map<String, String> additionalConfigs) {
verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -829,6 +843,14 @@ public class DAG {
confProtoBuilder.addConfKeyValues(kvp);
}
}
+ if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
+ for (Entry<String, String> entry : additionalConfigs.entrySet()) {
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
+ confProtoBuilder.addConfKeyValues(kvp);
+ }
+ }
dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
// should this replace BINARY_PB_CONF???
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 06c7008..9f717a7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -791,6 +791,16 @@ public class TezConfiguration extends Configuration {
+ "yarn.ats.max.polling.time.per.event.millis";
public static final int YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT = 10;
+
+ public static final String YARN_ATS_ACL_DOMAINS_AUTO_CREATE = TEZ_PREFIX
+ + "yarn.ats.acl.domains.auto-create";
+ public static final boolean YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT = true;
+
+ public static final String YARN_ATS_ACL_SESSION_DOMAIN_ID = TEZ_PREFIX
+ + "yarn.ats.acl.session.domain.id";
+ public static final String YARN_ATS_ACL_DAG_DOMAIN_ID = TEZ_PREFIX
+ + "yarn.ats.acl.dag.domain.id";
+
/**
* Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the
* incomplete DAGs from the previous instance of the app master.
@@ -900,6 +910,12 @@ public class TezConfiguration extends Configuration {
+ "disable.client-version-check";
public static final boolean TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT = false;
-
+ /**
+ * Boolean value.
+ * Allow disabling of Timeline Domains even if Timeline is being used.
+ */
+ public static final String TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS = TEZ_PREFIX
+ + "allow.disabled.timeline-domains";
+ public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 789de24..35483a6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1922,7 +1922,7 @@ public class DAGAppMaster extends AbstractService {
// for an app later
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
- newDAG.getUserName());
+ newDAG.getUserName(), newDAG.getConf());
try {
historyEventHandler.handleCriticalEvent(
new DAGHistoryEvent(newDAG.getID(), submittedEvent));
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 7f0fab3..ab122c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -55,13 +56,15 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private String user;
private Map<String, LocalResource> cumulativeAdditionalLocalResources;
+ private Configuration conf;
+
public DAGSubmittedEvent() {
}
public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
Map<String, LocalResource> cumulativeAdditionalLocalResources,
- String user) {
+ String user, Configuration conf) {
this.dagID = dagID;
this.dagName = dagPlan.getName();
this.submitTime = submitTime;
@@ -69,6 +72,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
this.applicationAttemptId = applicationAttemptId;
this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
this.user = user;
+ this.conf = conf;
}
@Override
@@ -183,4 +187,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
return user;
}
+ public Configuration getConf() {
+ return conf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index cd770a3..3b6c3ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -167,7 +167,7 @@ public class TestHistoryEventsProtoConversion {
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), null, "");
+ ApplicationId.newInstance(0, 1), 1), null, "", null);
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index e0f8c21..db8add7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -116,7 +116,7 @@ public class TestHistoryEventJsonConversion {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
- null, user);
+ null, user, null);
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index 85738b8..955e5dd 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -54,7 +54,7 @@
<profile>
<id>hadoop24</id>
<activation>
- <activeByDefault>true</activeByDefault>
+ <activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
@@ -64,6 +64,19 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>hadoop26</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<build>
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
index b6ce081..1b0ebbe 100644
--- a/tez-plugins/pom.xml
+++ b/tez-plugins/pom.xml
@@ -30,12 +30,21 @@
<profile>
<id>hadoop24</id>
<activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>tez-yarn-timeline-history</module>
+ </modules>
+ </profile>
+ <profile>
+ <id>hadoop26</id>
+ <activation>
<property>
<name>!skipATS</name>
</property>
</activation>
<modules>
- <module>tez-yarn-timeline-history</module>
+ <module>tez-yarn-timeline-history-with-acls</module>
</modules>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml b/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml
new file mode 100644
index 0000000..f277d97
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml
@@ -0,0 +1,139 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-plugins</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
+
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
new file mode 100644
index 0000000..e11643f
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.ats.acls;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.security.ACLConfigurationParser;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.common.security.ACLType;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
+
+ private final static Log LOG = LogFactory.getLog(ATSHistoryACLPolicyManager.class);
+
+ TimelineClient timelineClient;
+ Configuration conf;
+ String user;
+ final static String DOMAIN_ID_PREFIX = "Tez_ATS_";
+
+ private void initializeTimelineClient() {
+ if (this.conf == null) {
+ throw new TezUncheckedException("ATSACLManager not configured");
+ }
+ if (timelineClient != null) {
+ this.timelineClient.stop();
+ this.timelineClient = null;
+ }
+ this.timelineClient = TimelineClient.createTimelineClient();
+ this.timelineClient.init(this.conf);
+ this.timelineClient.start();
+ try {
+ this.user = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ throw new TezUncheckedException("Unable to get Current User UGI", e);
+ }
+ }
+
+ private String getMergedViewACLs(ACLConfigurationParser parser,
+ DAGAccessControls dagAccessControls) {
+ Map<ACLType, Set<String>> allowedUsers = parser.getAllowedUsers();
+ Map<ACLType, Set<String>> allowedGroups = parser.getAllowedGroups();
+
+ Set<String> viewUsers = new HashSet<String>();
+ viewUsers.add(user);
+ if (allowedUsers.containsKey(ACLType.AM_VIEW_ACL)) {
+ viewUsers.addAll(allowedUsers.get(ACLType.AM_VIEW_ACL));
+ }
+ if (dagAccessControls != null && dagAccessControls.getUsersWithViewACLs() != null) {
+ viewUsers.addAll(dagAccessControls.getUsersWithViewACLs());
+ }
+
+ if (viewUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+ return ACLManager.WILDCARD_ACL_VALUE;
+ }
+
+ Set<String> viewGroups = new HashSet<String>();
+ if (allowedGroups.containsKey(ACLType.AM_VIEW_ACL)) {
+ viewGroups.addAll(allowedGroups.get(ACLType.AM_VIEW_ACL));
+ }
+ if (dagAccessControls != null && dagAccessControls.getGroupsWithViewACLs() != null) {
+ viewGroups.addAll(dagAccessControls.getGroupsWithViewACLs());
+ }
+
+ return ACLManager.toCommaSeparatedString(viewUsers) + " " +
+ ACLManager.toCommaSeparatedString(viewGroups);
+ }
+
+ private void createTimelineDomain(String domainId, Configuration tezConf,
+ DAGAccessControls dagAccessControls) throws IOException {
+ TimelineDomain timelineDomain = new TimelineDomain();
+ timelineDomain.setId(domainId);
+
+ ACLConfigurationParser parser = new ACLConfigurationParser(tezConf, false);
+ timelineDomain.setReaders(getMergedViewACLs(parser, dagAccessControls));
+ timelineDomain.setWriters(user);
+
+ try {
+ timelineClient.putDomain(timelineDomain);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ private Map<String, String> createSessionDomain(Configuration tezConf,
+ ApplicationId applicationId, DAGAccessControls dagAccessControls)
+ throws IOException {
+ String domainId =
+ tezConf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+ if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
+ TezConfiguration.TEZ_AM_ACLS_ENABLED_DEFAULT)) {
+ if (domainId != null) {
+ throw new TezUncheckedException("ACLs disabled but DomainId is specified"
+ + ", aclsEnabled=true, domainId=" + domainId);
+ }
+ return null;
+ }
+
+ boolean autoCreateDomain = tezConf.getBoolean(TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE,
+ TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT);
+
+ if (domainId != null) {
+ // do nothing
+ LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
+ return null;
+ } else {
+ if (!autoCreateDomain) {
+ // Error - Cannot fallback to default as that leaves ACLs open
+ throw new TezUncheckedException("Timeline DomainId is not specified and auto-create"
+ + " Domains is disabled");
+ }
+ domainId = DOMAIN_ID_PREFIX + applicationId.toString();
+ createTimelineDomain(domainId, tezConf, dagAccessControls);
+ LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId);
+ return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
+ }
+ }
+
+ private Map<String, String> createDAGDomain(Configuration tezConf,
+ ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
+ throws IOException {
+ if (dagAccessControls == null) {
+ // No DAG specific ACLs
+ return null;
+ }
+
+ String domainId =
+ tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+ if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
+ TezConfiguration.TEZ_AM_ACLS_ENABLED_DEFAULT)) {
+ if (domainId != null) {
+ throw new TezUncheckedException("ACLs disabled but domainId for DAG is specified"
+ + ", aclsEnabled=true, domainId=" + domainId);
+ }
+ return null;
+ }
+
+ boolean autoCreateDomain = tezConf.getBoolean(TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE,
+ TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT);
+
+ if (domainId != null) {
+ // do nothing
+ LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
+ return null;
+ } else {
+ if (!autoCreateDomain) {
+ // Error - Cannot fallback to default as that leaves ACLs open
+ throw new TezUncheckedException("Timeline DomainId is not specified and auto-create"
+ + " Domains is disabled");
+ }
+
+ domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
+ createTimelineDomain(domainId, tezConf, dagAccessControls);
+ LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
+ return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
+ }
+ }
+
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ initializeTimelineClient();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public Map<String, String> setupSessionACLs(Configuration conf, ApplicationId applicationId)
+ throws IOException {
+ return createSessionDomain(conf, applicationId, null);
+ }
+
+ @Override
+ public Map<String, String> setupNonSessionACLs(Configuration conf, ApplicationId applicationId,
+ DAGAccessControls dagAccessControls) throws IOException {
+ return createSessionDomain(conf, applicationId, dagAccessControls);
+ }
+
+ @Override
+ public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId,
+ String dagName, DAGAccessControls dagAccessControls) throws IOException {
+ return createDAGDomain(conf, applicationId, dagName, dagAccessControls);
+ }
+
+ @Override
+ public void updateTimelineEntityDomain(Object timelineEntity, String domainId) {
+ if (!(timelineEntity instanceof TimelineEntity)) {
+ throw new UnsupportedOperationException("Invalid object provided of type"
+ + timelineEntity.getClass().getName());
+ }
+ TimelineEntity entity = (TimelineEntity) timelineEntity;
+ entity.setDomainId(domainId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats
new file mode 100644
index 0000000..9778443
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats
@@ -0,0 +1 @@
+../../../../../../../../../../tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
new file mode 100644
index 0000000..11e726a
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.ats.acls;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Random;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.test.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class TestATSHistoryWithACLs {
+
+ private static final Log LOG = LogFactory.getLog(TestATSHistoryWithACLs.class);
+
+ protected static MiniTezClusterWithTimeline mrrTezCluster = null;
+ protected static MiniDFSCluster dfsCluster = null;
+ private static String timelineAddress;
+ private Random random = new Random();
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestATSHistoryWithACLs.class.getName() + "-tmpDir";
+
+ private static String user;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (mrrTezCluster == null) {
+ try {
+ mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithACLs.class.getName(),
+ 1, 1, 1, true);
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ } catch (Throwable e) {
+ LOG.info("Failed to start Mini Tez Cluster", e);
+ }
+ }
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ timelineAddress = mrrTezCluster.getConfig().get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
+ if (timelineAddress != null) {
+ // Hack to handle bug in MiniYARNCluster handling of webapp address
+ timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ LOG.info("Shutdown invoked");
+ Thread.sleep(10000);
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ // To be replaced after Timeline has java APIs for domains
+ private <K> K getTimelineData(String url, Class<K> clazz) {
+ Client client = new Client();
+ WebResource resource = client.resource(url);
+
+ ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ assertEquals(200, response.getStatus());
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+
+ K entity = response.getEntity(clazz);
+ assertNotNull(entity);
+ return entity;
+ }
+
+ private TimelineDomain getDomain(String domainId) {
+ assertNotNull(timelineAddress);
+ String url = "http://" + timelineAddress + "/ws/v1/timeline/domain/" + domainId;
+ LOG.info("Getting timeline domain: " + url);
+ TimelineDomain domain = getTimelineData(url, TimelineDomain.class);
+ assertNotNull(domain);
+ assertNotNull(domain.getOwner());
+ assertNotNull(domain.getReaders());
+ assertNotNull(domain.getWriters());
+ LOG.info("TimelineDomain for id " + domainId
+ + ", owner=" + domain.getOwner()
+ + ", readers=" + domain.getReaders()
+ + ", writers=" + domain.getWriters());
+ return domain;
+ }
+
+ private void verifyDomainACLs(TimelineDomain timelineDomain,
+ Collection<String> users, Collection<String> groups) {
+ String readers = timelineDomain.getReaders();
+ int pos = readers.indexOf(" ");
+ String readerUsers = readers.substring(0, pos);
+ String readerGroups = readers.substring(pos+1);
+
+ assertTrue(readerUsers.contains(user));
+ for (String s : users) {
+ assertTrue(readerUsers.contains(s));
+ }
+ for (String s : groups) {
+ assertTrue(readerGroups.contains(s));
+ }
+
+ if (!user.equals("nobody1") && !users.contains("nobody1")) {
+ assertFalse(readerUsers.contains("nobody1"));
+ }
+
+ }
+
+ @Test (timeout=50000)
+ public void testSimpleAMACls() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ String viewAcls = "nobody nobody_group";
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+
+ TimelineDomain timelineDomain = getDomain(
+ ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString());
+ verifyDomainACLs(timelineDomain,
+ Collections.singleton("nobody"), Collections.singleton("nobody_group"));
+
+ verifyEntityDomains(applicationId, true);
+ }
+
+ @Test (timeout=50000)
+ public void testDAGACls() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ String viewAcls = "nobody nobody_group";
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+ DAGAccessControls accessControls = new DAGAccessControls();
+ accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
+ accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
+ dag.setAccessControls(accessControls);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+
+ TimelineDomain timelineDomain = getDomain(
+ ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString());
+ verifyDomainACLs(timelineDomain,
+ Collections.singleton("nobody"), Collections.singleton("nobody_group"));
+
+ timelineDomain = getDomain(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX
+ + applicationId.toString() + "_TezSleepProcessor");
+ verifyDomainACLs(timelineDomain,
+ Sets.newHashSet("nobody", "nobody2"),
+ Sets.newHashSet("nobody_group", "nobody_group2"));
+
+ verifyEntityDomains(applicationId, false);
+ }
+
+ private void verifyEntityDomains(ApplicationId applicationId, boolean sameDomain) {
+ assertNotNull(timelineAddress);
+
+ String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
+ + "tez_" + applicationId.toString();
+ LOG.info("Getting timeline entity for tez application: " + appUrl);
+ TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
+
+ TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
+ String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
+ + tezDAGID.toString();
+ LOG.info("Getting timeline entity for tez dag: " + dagUrl);
+ TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
+
+ // App and DAG entities should have different domains
+ assertEquals(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString(),
+ appEntity.getDomainId());
+ if (!sameDomain) {
+ assertEquals(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString()
+ + "_TezSleepProcessor", dagEntity.getDomainId());
+ } else {
+ assertEquals(appEntity.getDomainId(), dagEntity.getDomainId());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests
new file mode 100644
index 0000000..526cdf5
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests
@@ -0,0 +1 @@
+../../../../../../../tez-yarn-timeline-history/src/test/java/org/apache/tez/tests
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/pom.xml b/tez-plugins/tez-yarn-timeline-history/pom.xml
index f53a5e8..9b77c29 100644
--- a/tez-plugins/tez-yarn-timeline-history/pom.xml
+++ b/tez-plugins/tez-yarn-timeline-history/pom.xml
@@ -37,6 +37,17 @@
<artifactId>tez-dag</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
@@ -53,6 +64,40 @@
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</dependency>
@@ -74,6 +119,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -85,4 +135,17 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>hadoop26</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
</project>
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index ce09a3f..c68d395 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -18,9 +18,11 @@
package org.apache.tez.dag.history.logging.ats;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,8 +34,11 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -59,12 +64,18 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
TimelineClient timelineClient;
private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+ private Map<TezDAGID, String> dagDomainIdMap = new HashMap<TezDAGID, String>();
private long maxTimeToWaitOnShutdown;
private boolean waitForeverOnShutdown = false;
private int maxEventsPerBatch;
private long maxPollingTimeMillis;
+ private String sessionDomainId;
+ private static final String atsHistoryACLManagerClassName =
+ "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
+ private HistoryACLPolicyManager historyACLPolicyManager;
+
public ATSHistoryLoggingService() {
super(ATSHistoryLoggingService.class.getName());
}
@@ -86,6 +97,23 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (maxTimeToWaitOnShutdown < 0) {
waitForeverOnShutdown = true;
}
+ sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+
+ LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+ try {
+ historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+ atsHistoryACLManagerClassName);
+ historyACLPolicyManager.setConf(conf);
+ } catch (TezUncheckedException e) {
+ LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+ + ". ACLs cannot be enforced correctly for history data in Timeline", e);
+ if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
+ TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
+ throw e;
+ }
+ historyACLPolicyManager = null;
+ }
+
}
@Override
@@ -223,6 +251,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
skippedDAGs.add(dagId);
return false;
}
+ if (historyACLPolicyManager != null) {
+ String dagDomainId = dagSubmittedEvent.getConf().get(
+ TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+ if (dagDomainId != null) {
+ dagDomainIdMap.put(dagId, dagDomainId);
+ }
+ }
}
if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
// Remove from set to keep size small
@@ -240,13 +275,25 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
return true;
}
-
-
private void handleEvents(List<DAGHistoryEvent> events) {
TimelineEntity[] entities = new TimelineEntity[events.size()];
for (int i = 0; i < events.size(); ++i) {
- entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(
- events.get(i).getHistoryEvent());
+ DAGHistoryEvent event = events.get(i);
+ String domainId = sessionDomainId;
+ TezDAGID dagId = event.getDagID();
+
+ if (historyACLPolicyManager != null && dagId != null) {
+ if (dagDomainIdMap.containsKey(dagId)) {
+ domainId = dagDomainIdMap.get(dagId);
+ }
+ }
+
+ entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+ if (historyACLPolicyManager != null) {
+ if (domainId != null && !domainId.isEmpty()) {
+ historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
+ }
+ }
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index a9e00c1..18ec43e 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -61,12 +61,14 @@ public class TestATSHistoryLoggingService {
conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
1000l);
conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
+ conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
atsInvokeCounter = 0;
atsEntitiesCounter = 0;
atsHistoryLoggingService.init(conf);
atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
atsHistoryLoggingService.start();
when(appContext.getClock()).thenReturn(clock);
+ when(appContext.getCurrentDAGID()).thenReturn(null);
when(atsHistoryLoggingService.timelineClient.putEntities(
Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
new Answer<Object>() {
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
new file mode 100644
index 0000000..f3c18cf
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.io.IOException;
+import java.util.Random;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.test.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class TestATSHistoryWithMiniCluster {
+
+ private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
+
+ protected static MiniTezClusterWithTimeline mrrTezCluster = null;
+ protected static MiniDFSCluster dfsCluster = null;
+ private static String timelineAddress;
+ private Random random = new Random();
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (mrrTezCluster == null) {
+ try {
+ mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
+ 1, 1, 1, true);
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ } catch (Throwable e) {
+ LOG.info("Failed to start Mini Tez Cluster", e);
+ }
+ }
+ timelineAddress = mrrTezCluster.getConfig().get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
+ if (timelineAddress != null) {
+ // Hack to handle bug in MiniYARNCluster handling of webapp address
+ timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ LOG.info("Shutdown invoked");
+ Thread.sleep(10000);
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ // To be replaced after Timeline has java APIs for domains
+ private <K> K getTimelineData(String url, Class<K> clazz) {
+ Client client = new Client();
+ WebResource resource = client.resource(url);
+
+ ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+
+ K entity = response.getEntity(clazz);
+ Assert.assertNotNull(entity);
+ return entity;
+ }
+
+ @Test (timeout=50000)
+ public void testSimpleAMACls() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+
+// verifyEntityExistence(applicationId);
+ }
+
+ @Test (timeout=50000)
+ public void testDAGACls() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+// verifyEntityExistence(applicationId);
+ }
+
+ private void verifyEntityExistence(ApplicationId applicationId) {
+ Assert.assertNotNull(timelineAddress);
+
+ String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
+ + "tez_" + applicationId.toString() + "?fields=otherinfo";
+ LOG.info("Getting timeline entity for tez application: " + appUrl);
+ TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
+ Assert.assertNotNull(appEntity);
+
+ TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
+ String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
+ + tezDAGID.toString() + "?fields=otherinfo";
+ LOG.info("Getting timeline entity for tez dag: " + dagUrl);
+ TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
+ Assert.assertNotNull(dagEntity);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/bdf0307e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 0f2942c..3ba2f86 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -118,7 +118,7 @@ public class TestHistoryEventTimelineConversion {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
- null, user);
+ null, user, null);
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
@@ -264,7 +264,7 @@ public class TestHistoryEventTimelineConversion {
long submitTime = random.nextLong();
DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
- applicationAttemptId, null, user);
+ applicationAttemptId, null, user, null);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());