You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC
svn commit: r1619019 [3/10] - in
/hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/
hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/had...
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java Wed Aug 20 01:34:29 2014
@@ -150,6 +150,15 @@ public abstract class ResourceCalculator
Resource clusterResource, Resource numerator, Resource denominator);
/**
+ * Determine if a resource is not suitable for use as a divisor
+ * (will result in divide by 0, etc)
+ *
+ * @param r resource
+ * @return true if divisor is invalid (should not be used), false else
+ */
+ public abstract boolean isInvalidDivisor(Resource r);
+
+ /**
* Ratio of resource <code>a</code> to resource <code>b</code>.
*
* @param a resource
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java Wed Aug 20 01:34:29 2014
@@ -184,6 +184,11 @@ public class Resources {
return calculator.roundDown(lhs, factor);
}
+ public static boolean isInvalidDivisor(
+ ResourceCalculator resourceCalculator, Resource divisor) {
+ return resourceCalculator.isInvalidDivisor(divisor);
+ }
+
public static float ratio(
ResourceCalculator resourceCalculator, Resource lhs, Resource rhs) {
return resourceCalculator.ratio(lhs, rhs);
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java Wed Aug 20 01:34:29 2014
@@ -19,12 +19,12 @@ package org.apache.hadoop.yarn.webapp.ut
import static org.apache.hadoop.yarn.util.StringHelper.PATH_JOINER;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -34,11 +34,18 @@ import org.apache.hadoop.http.HttpServer
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.RMHAUtils;
@Private
@Evolving
public class WebAppUtils {
+ public static final String WEB_APP_TRUSTSTORE_PASSWORD_KEY =
+ "ssl.server.truststore.password";
+ public static final String WEB_APP_KEYSTORE_PASSWORD_KEY =
+ "ssl.server.keystore.password";
+ public static final String WEB_APP_KEY_PASSWORD_KEY =
+ "ssl.server.keystore.keypassword";
public static final String HTTPS_PREFIX = "https://";
public static final String HTTP_PREFIX = "http://";
@@ -170,6 +177,37 @@ public class WebAppUtils {
return sb.toString();
}
+ /**
+ * Get the URL to use for binding where bind hostname can be specified
+ * to override the hostname in the webAppURLWithoutScheme. Port specified in the
+ * webAppURLWithoutScheme will be used.
+ *
+ * @param conf the configuration
+ * @param hostProperty bind host property name
+ * @param webAppURLWithoutScheme web app URL without scheme String
+ * @return String representing bind URL
+ */
+ public static String getWebAppBindURL(
+ Configuration conf,
+ String hostProperty,
+ String webAppURLWithoutScheme) {
+
+ // If the bind-host setting exists then it overrides the hostname
+ // portion of the corresponding webAppURLWithoutScheme
+ String host = conf.getTrimmed(hostProperty);
+ if (host != null && !host.isEmpty()) {
+ if (webAppURLWithoutScheme.contains(":")) {
+ webAppURLWithoutScheme = host + ":" + webAppURLWithoutScheme.split(":")[1];
+ }
+ else {
+ throw new YarnRuntimeException("webAppURLWithoutScheme must include port specification but doesn't: " +
+ webAppURLWithoutScheme);
+ }
+ }
+
+ return webAppURLWithoutScheme;
+ }
+
public static String getNMWebAppURLWithoutScheme(Configuration conf) {
if (YarnConfiguration.useHttps(conf)) {
return conf.get(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS,
@@ -242,21 +280,56 @@ public class WebAppUtils {
/**
* Load the SSL keystore / truststore into the HttpServer builder.
+ * @param builder the HttpServer2.Builder to populate with ssl config
*/
public static HttpServer2.Builder loadSslConfiguration(
HttpServer2.Builder builder) {
- Configuration sslConf = new Configuration(false);
+ return loadSslConfiguration(builder, null);
+ }
+
+ /**
+ * Load the SSL keystore / truststore into the HttpServer builder.
+ * @param builder the HttpServer2.Builder to populate with ssl config
+ * @param sslConf the Configuration instance to use during loading of SSL conf
+ */
+ public static HttpServer2.Builder loadSslConfiguration(
+ HttpServer2.Builder builder, Configuration sslConf) {
+ if (sslConf == null) {
+ sslConf = new Configuration(false);
+ }
boolean needsClientAuth = YarnConfiguration.YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
sslConf.addResource(YarnConfiguration.YARN_SSL_SERVER_RESOURCE_DEFAULT);
return builder
.needsClientAuth(needsClientAuth)
- .keyPassword(sslConf.get("ssl.server.keystore.keypassword"))
+ .keyPassword(getPassword(sslConf, WEB_APP_KEY_PASSWORD_KEY))
.keyStore(sslConf.get("ssl.server.keystore.location"),
- sslConf.get("ssl.server.keystore.password"),
+ getPassword(sslConf, WEB_APP_KEYSTORE_PASSWORD_KEY),
sslConf.get("ssl.server.keystore.type", "jks"))
.trustStore(sslConf.get("ssl.server.truststore.location"),
- sslConf.get("ssl.server.truststore.password"),
+ getPassword(sslConf, WEB_APP_TRUSTSTORE_PASSWORD_KEY),
sslConf.get("ssl.server.truststore.type", "jks"));
}
+
+ /**
+ * Leverages the Configuration.getPassword method to attempt to get
+ * passwords from the CredentialProvider API before falling back to
+ * clear text in config - if falling back is allowed.
+ * @param conf Configuration instance
+ * @param alias name of the credential to retreive
+ * @return String credential value or null
+ */
+ static String getPassword(Configuration conf, String alias) {
+ String password = null;
+ try {
+ char[] passchars = conf.getPassword(alias);
+ if (passchars != null) {
+ password = new String(passchars);
+ }
+ }
+ catch (IOException ioe) {
+ password = null;
+ }
+ return password;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Aug 20 01:34:29 2014
@@ -71,6 +71,17 @@
</property>
<property>
+ <description>
+ The actual address the server will bind to. If this optional address is
+ set, the RPC and webapp servers will bind to this address and the port specified in
+ yarn.resourcemanager.address and yarn.resourcemanager.webapp.address, respectively. This
+ is most useful for making RM listen to all interfaces by setting to 0.0.0.0.
+ </description>
+ <name>yarn.resourcemanager.bind-host</name>
+ <value></value>
+ </property>
+
+ <property>
<description>The number of threads used to handle applications manager requests.</description>
<name>yarn.resourcemanager.client.thread-count</name>
<value>50</value>
@@ -195,6 +206,15 @@
</property>
<property>
+ <description>Flag to enable override of the default kerberos authentication
+ filter with the RM authentication filter to allow authentication using
+ delegation tokens(fallback to kerberos if the tokens are missing). Only
+ applicable when the http authentication type is kerberos.</description>
+ <name>yarn.resourcemanager.webapp.delegation-token-auth-filter.enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
<description>How long to wait until a node manager is considered dead.</description>
<name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
@@ -454,7 +474,7 @@
<property>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
- election fo this cluster and ensures it does not affect
+ election for this cluster and ensures it does not affect
other clusters</description>
<name>yarn.resourcemanager.cluster-id</name>
<!--value>yarn-cluster</value-->
@@ -627,6 +647,17 @@
</property>
<property>
+ <description>
+ The actual address the server will bind to. If this optional address is
+ set, the RPC and webapp servers will bind to this address and the port specified in
+ yarn.nodemanager.address and yarn.nodemanager.webapp.address, respectively. This is
+ most useful for making NM listen to all interfaces by setting to 0.0.0.0.
+ </description>
+ <name>yarn.nodemanager.bind-host</name>
+ <value></value>
+ </property>
+
+ <property>
<description>Environment variables that should be forwarded from the NodeManager's environment to the container's.</description>
<name>yarn.nodemanager.admin-env</name>
<value>MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX</value>
@@ -1164,6 +1195,18 @@
</property>
<property>
+ <description>
+ The actual address the server will bind to. If this optional address is
+ set, the RPC and webapp servers will bind to this address and the port specified in
+ yarn.timeline-service.address and yarn.timeline-service.webapp.address, respectively.
+ This is most useful for making the service listen to all interfaces by setting to
+ 0.0.0.0.
+ </description>
+ <name>yarn.timeline-service.bind-host</name>
+ <value></value>
+ </property>
+
+ <property>
<description>Store class name for timeline store.</description>
<name>yarn.timeline-service.store-class</name>
<value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java Wed Aug 20 01:34:29 2014
@@ -28,6 +28,7 @@ import java.net.SocketAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class TestYarnConfiguration {
@@ -75,4 +76,131 @@ public class TestYarnConfiguration {
YarnConfiguration.DEFAULT_NM_PORT);
assertEquals(1234, addr.getPort());
}
+
+ @Test
+ public void testGetSocketAddr() throws Exception {
+
+ YarnConfiguration conf;
+ InetSocketAddress resourceTrackerAddress;
+
+ //all default
+ conf = new YarnConfiguration();
+ resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ assertEquals(
+ new InetSocketAddress(
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+ resourceTrackerAddress);
+
+ //with address
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.1");
+ resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ assertEquals(
+ new InetSocketAddress(
+ "10.0.0.1",
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+ resourceTrackerAddress);
+
+ //address and socket
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5001");
+ resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ assertEquals(
+ new InetSocketAddress(
+ "10.0.0.2",
+ 5001),
+ resourceTrackerAddress);
+
+ //bind host only
+ conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_BIND_HOST, "10.0.0.3");
+ resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ assertEquals(
+ new InetSocketAddress(
+ "10.0.0.3",
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+ resourceTrackerAddress);
+
+ //bind host and address no port
+ conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2");
+ resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ assertEquals(
+ new InetSocketAddress(
+ "0.0.0.0",
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
+ resourceTrackerAddress);
+
+ //bind host and address with port
+ conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5003");
+ resourceTrackerAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ assertEquals(
+ new InetSocketAddress(
+ "0.0.0.0",
+ 5003),
+ resourceTrackerAddress);
+
+ }
+
+ @Test
+ public void testUpdateConnectAddr() throws Exception {
+ YarnConfiguration conf;
+ InetSocketAddress resourceTrackerConnectAddress;
+ InetSocketAddress serverAddress;
+
+ //no override, old behavior. Won't work on a host named "yo.yo.yo"
+ conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo");
+ serverAddress = new InetSocketAddress(
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
+ Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1]));
+
+ resourceTrackerConnectAddress = conf.updateConnectAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ serverAddress);
+
+ assertFalse(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
+
+ //cause override with address
+ conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo");
+ conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
+ serverAddress = new InetSocketAddress(
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
+ Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1]));
+
+ resourceTrackerConnectAddress = conf.updateConnectAddr(
+ YarnConfiguration.RM_BIND_HOST,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
+ serverAddress);
+
+ assertTrue(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java Wed Aug 20 01:34:29 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
public class ApplicationHistoryClientService extends AbstractService {
@@ -75,10 +76,11 @@ public class ApplicationHistoryClientSer
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
- InetSocketAddress address =
- conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
+ InetSocketAddress address = conf.getSocketAddr(
+ YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
server =
rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
@@ -88,8 +90,10 @@ public class ApplicationHistoryClientSer
server.start();
this.bindAddress =
- conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
- server.getListenerAddress());
+ conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+ server.getListenerAddress());
LOG.info("Instantiated ApplicationHistoryClientService at "
+ this.bindAddress);
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Wed Aug 20 01:34:29 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ap
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,6 +28,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
@@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
@@ -59,12 +62,12 @@ public class ApplicationHistoryServer ex
private static final Log LOG = LogFactory
.getLog(ApplicationHistoryServer.class);
- protected ApplicationHistoryClientService ahsClientService;
- protected ApplicationHistoryManager historyManager;
- protected TimelineStore timelineStore;
- protected TimelineDelegationTokenSecretManagerService secretManagerService;
- protected TimelineACLsManager timelineACLsManager;
- protected WebApp webApp;
+ private ApplicationHistoryClientService ahsClientService;
+ private ApplicationHistoryManager historyManager;
+ private TimelineStore timelineStore;
+ private TimelineDelegationTokenSecretManagerService secretManagerService;
+ private TimelineDataManager timelineDataManager;
+ private WebApp webApp;
public ApplicationHistoryServer() {
super(ApplicationHistoryServer.class.getName());
@@ -72,15 +75,18 @@ public class ApplicationHistoryServer ex
@Override
protected void serviceInit(Configuration conf) throws Exception {
- historyManager = createApplicationHistory();
- ahsClientService = createApplicationHistoryClientService(historyManager);
- addService(ahsClientService);
- addService((Service) historyManager);
+ // init timeline services first
timelineStore = createTimelineStore(conf);
addIfService(timelineStore);
secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
addService(secretManagerService);
- timelineACLsManager = createTimelineACLsManager(conf);
+ timelineDataManager = createTimelineDataManager(conf);
+
+ // init generic history service afterwards
+ historyManager = createApplicationHistoryManager(conf);
+ ahsClientService = createApplicationHistoryClientService(historyManager);
+ addService(ahsClientService);
+ addService((Service) historyManager);
DefaultMetricsSystem.initialize("ApplicationHistoryServer");
JvmMetrics.initSingleton("ApplicationHistoryServer", null);
@@ -111,21 +117,22 @@ public class ApplicationHistoryServer ex
@Private
@VisibleForTesting
- public ApplicationHistoryClientService getClientService() {
+ ApplicationHistoryClientService getClientService() {
return this.ahsClientService;
}
- protected ApplicationHistoryClientService
- createApplicationHistoryClientService(
- ApplicationHistoryManager historyManager) {
- return new ApplicationHistoryClientService(historyManager);
- }
-
- protected ApplicationHistoryManager createApplicationHistory() {
- return new ApplicationHistoryManagerImpl();
+ /**
+ * @return ApplicationTimelineStore
+ */
+ @Private
+ @VisibleForTesting
+ public TimelineStore getTimelineStore() {
+ return timelineStore;
}
- protected ApplicationHistoryManager getApplicationHistory() {
+ @Private
+ @VisibleForTesting
+ ApplicationHistoryManager getApplicationHistoryManager() {
return this.historyManager;
}
@@ -154,28 +161,35 @@ public class ApplicationHistoryServer ex
launchAppHistoryServer(args);
}
- protected ApplicationHistoryManager createApplicationHistoryManager(
+ private ApplicationHistoryClientService
+ createApplicationHistoryClientService(
+ ApplicationHistoryManager historyManager) {
+ return new ApplicationHistoryClientService(historyManager);
+ }
+
+ private ApplicationHistoryManager createApplicationHistoryManager(
Configuration conf) {
return new ApplicationHistoryManagerImpl();
}
- protected TimelineStore createTimelineStore(
+ private TimelineStore createTimelineStore(
Configuration conf) {
return ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
TimelineStore.class), conf);
}
- protected TimelineDelegationTokenSecretManagerService
+ private TimelineDelegationTokenSecretManagerService
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
return new TimelineDelegationTokenSecretManagerService();
}
- protected TimelineACLsManager createTimelineACLsManager(Configuration conf) {
- return new TimelineACLsManager(conf);
+ private TimelineDataManager createTimelineDataManager(Configuration conf) {
+ return new TimelineDataManager(
+ timelineStore, new TimelineACLsManager(conf));
}
- protected void startWebApp() {
+ private void startWebApp() {
Configuration conf = getConfig();
// Always load pseudo authentication filter to parse "user.name" in an URL
// to identify a HTTP request's user in insecure mode.
@@ -183,23 +197,41 @@ public class ApplicationHistoryServer ex
// the customized filter will be loaded by the timeline server to do Kerberos
// + DT authentication.
String initializers = conf.get("hadoop.http.filter.initializers");
+
initializers =
- initializers == null || initializers.length() == 0 ? "" : ","
- + initializers;
- if (!initializers.contains(
- TimelineAuthenticationFilterInitializer.class.getName())) {
- conf.set("hadoop.http.filter.initializers",
- TimelineAuthenticationFilterInitializer.class.getName()
- + initializers);
+ initializers == null || initializers.length() == 0 ? "" : initializers;
+
+ if (!initializers.contains(TimelineAuthenticationFilterInitializer.class
+ .getName())) {
+ initializers =
+ TimelineAuthenticationFilterInitializer.class.getName() + ","
+ + initializers;
+ }
+
+ String[] parts = initializers.split(",");
+ ArrayList<String> target = new ArrayList<String>();
+ for (String filterInitializer : parts) {
+ filterInitializer = filterInitializer.trim();
+ if (filterInitializer.equals(AuthenticationFilterInitializer.class
+ .getName())) {
+ continue;
+ }
+ target.add(filterInitializer);
+ }
+ String actualInitializers =
+ org.apache.commons.lang.StringUtils.join(target, ",");
+ if (!actualInitializers.equals(initializers)) {
+ conf.set("hadoop.http.filter.initializers", actualInitializers);
}
- String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
+ String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+ YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
LOG.info("Instantiating AHSWebApp at " + bindAddress);
try {
AHSWebApp ahsWebApp = AHSWebApp.getInstance();
ahsWebApp.setApplicationHistoryManager(historyManager);
- ahsWebApp.setTimelineStore(timelineStore);
ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
- ahsWebApp.setTimelineACLsManager(timelineACLsManager);
+ ahsWebApp.setTimelineDataManager(timelineDataManager);
webApp =
WebApps
.$for("applicationhistory", ApplicationHistoryClientService.class,
@@ -211,14 +243,6 @@ public class ApplicationHistoryServer ex
throw new YarnRuntimeException(msg, e);
}
}
- /**
- * @return ApplicationTimelineStore
- */
- @Private
- @VisibleForTesting
- public TimelineStore getTimelineStore() {
- return timelineStore;
- }
private void doSecureLogin(Configuration conf) throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Wed Aug 20 01:34:29 2014
@@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.uti
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ApplicationContext;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -36,9 +35,8 @@ import com.google.common.annotations.Vis
public class AHSWebApp extends WebApp implements YarnWebParams {
private ApplicationHistoryManager applicationHistoryManager;
- private TimelineStore timelineStore;
private TimelineDelegationTokenSecretManagerService secretManagerService;
- private TimelineACLsManager timelineACLsManager;
+ private TimelineDataManager timelineDataManager;
private static AHSWebApp instance = null;
@@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp im
this.applicationHistoryManager = applicationHistoryManager;
}
- public TimelineStore getTimelineStore() {
- return timelineStore;
- }
-
- public void setTimelineStore(TimelineStore timelineStore) {
- this.timelineStore = timelineStore;
- }
-
public TimelineDelegationTokenSecretManagerService
getTimelineDelegationTokenSecretManagerService() {
return secretManagerService;
@@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp im
this.secretManagerService = secretManagerService;
}
- public TimelineACLsManager getTimelineACLsManager() {
- return timelineACLsManager;
+ public TimelineDataManager getTimelineDataManager() {
+ return timelineDataManager;
}
- public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) {
- this.timelineACLsManager = timelineACLsManager;
+ public void setTimelineDataManager(TimelineDataManager timelineDataManager) {
+ this.timelineDataManager = timelineDataManager;
}
@Override
@@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp im
bind(TimelineWebServices.class);
bind(GenericExceptionHandler.class);
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
- bind(TimelineStore.class).toInstance(timelineStore);
bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
secretManagerService);
- bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+ bind(TimelineDataManager.class).toInstance(timelineDataManager);
route("/", AHSController.class);
route(pajoin("/apps", APP_STATE), AHSController.class);
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java Wed Aug 20 01:34:29 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ti
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -60,8 +61,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
@@ -141,6 +146,11 @@ public class LeveldbTimelineStore extend
"z".getBytes();
private static final byte[] EMPTY_BYTES = new byte[0];
+
+ private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
+
+ private static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(1, 0);
@Private
@VisibleForTesting
@@ -193,6 +203,7 @@ public class LeveldbTimelineStore extend
}
LOG.info("Using leveldb path " + dbPath);
db = factory.open(new File(dbPath.toString()), options);
+ checkVersion();
startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
conf)));
@@ -1270,8 +1281,6 @@ public class LeveldbTimelineStore extend
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
}
- // warning is suppressed to prevent eclipse from noting unclosed resource
- @SuppressWarnings("resource")
@VisibleForTesting
List<String> getEntityTypes() throws IOException {
DBIterator iterator = null;
@@ -1489,4 +1498,65 @@ public class LeveldbTimelineStore extend
readOptions.fillCache(fillCache);
return db.iterator(readOptions);
}
+
+ Version loadVersion() throws IOException {
+ byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
+ // if version is not stored previously, treat it as 1.0.
+ if (data == null || data.length == 0) {
+ return Version.newInstance(1, 0);
+ }
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
+ return version;
+ }
+
+ // Only used for test
+ @VisibleForTesting
+ void storeVersion(Version state) throws IOException {
+ dbStoreVersion(state);
+ }
+
+ private void dbStoreVersion(Version state) throws IOException {
+ String key = TIMELINE_STORE_VERSION_KEY;
+ byte[] data =
+ ((VersionPBImpl) state).getProto().toByteArray();
+ try {
+ db.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ /**
+ * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+ * 2) Any incompatible change of TS-store is a major upgrade, and any
+ * compatible change of TS-store is a minor upgrade.
+ * 3) Within a minor upgrade, say 1.1 to 1.2:
+ * overwrite the version info and proceed as normal.
+ * 4) Within a major upgrade, say 1.2 to 2.0:
+ * throw exception and indicate user to use a separate upgrade tool to
+ * upgrade timeline store or remove incompatible old state.
+ */
+ private void checkVersion() throws IOException {
+ Version loadedVersion = loadVersion();
+ LOG.info("Loaded timeline store version info " + loadedVersion);
+ if (loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing timeline store version info " + getCurrentVersion());
+ dbStoreVersion(CURRENT_VERSION_INFO);
+ } else {
+ String incompatibleMessage =
+ "Incompatible version for timeline store: expecting version "
+ + getCurrentVersion() + ", but loading version " + loadedVersion;
+ LOG.fatal(incompatibleMessage);
+ throw new IOException(incompatibleMessage);
+ }
+ }
+
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Wed Aug 20 01:34:29 2014
@@ -18,14 +18,10 @@
package org.apache.hadoop.yarn.server.timeline.webapp;
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -80,14 +73,11 @@ public class TimelineWebServices {
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
- private TimelineStore store;
- private TimelineACLsManager timelineACLsManager;
+ private TimelineDataManager timelineDataManager;
@Inject
- public TimelineWebServices(TimelineStore store,
- TimelineACLsManager timelineACLsManager) {
- this.store = store;
- this.timelineACLsManager = timelineACLsManager;
+ public TimelineWebServices(TimelineDataManager timelineDataManager) {
+ this.timelineDataManager = timelineDataManager;
}
@XmlRootElement(name = "about")
@@ -148,61 +138,28 @@ public class TimelineWebServices {
@QueryParam("limit") String limit,
@QueryParam("fields") String fields) {
init(res);
- TimelineEntities entities = null;
try {
- EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
- boolean modified = extendFields(fieldEnums);
- UserGroupInformation callerUGI = getUser(req);
- entities = store.getEntities(
+ return timelineDataManager.getEntities(
parseStr(entityType),
- parseLongStr(limit),
+ parsePairStr(primaryFilter, ":"),
+ parsePairsStr(secondaryFilter, ",", ":"),
parseLongStr(windowStart),
parseLongStr(windowEnd),
parseStr(fromId),
parseLongStr(fromTs),
- parsePairStr(primaryFilter, ":"),
- parsePairsStr(secondaryFilter, ",", ":"),
- fieldEnums);
- if (entities != null) {
- Iterator<TimelineEntity> entitiesItr =
- entities.getEntities().iterator();
- while (entitiesItr.hasNext()) {
- TimelineEntity entity = entitiesItr.next();
- try {
- // check ACLs
- if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
- entitiesItr.remove();
- } else {
- // clean up system data
- if (modified) {
- entity.setPrimaryFilters(null);
- } else {
- cleanupOwnerInfo(entity);
- }
- }
- } catch (YarnException e) {
- LOG.error("Error when verifying access for user " + callerUGI
- + " on the events of the timeline entity "
- + new EntityIdentifier(entity.getEntityId(),
- entity.getEntityType()), e);
- entitiesItr.remove();
- }
- }
- }
+ parseLongStr(limit),
+ parseFieldsStr(fields, ","),
+ getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
} catch (IllegalArgumentException e) {
throw new BadRequestException("requested invalid field.");
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
- if (entities == null) {
- return new TimelineEntities();
- }
- return entities;
}
/**
@@ -220,33 +177,15 @@ public class TimelineWebServices {
init(res);
TimelineEntity entity = null;
try {
- EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
- boolean modified = extendFields(fieldEnums);
- entity =
- store.getEntity(parseStr(entityId), parseStr(entityType),
- fieldEnums);
- if (entity != null) {
- // check ACLs
- UserGroupInformation callerUGI = getUser(req);
- if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
- entity = null;
- } else {
- // clean up the system data
- if (modified) {
- entity.setPrimaryFilters(null);
- } else {
- cleanupOwnerInfo(entity);
- }
- }
- }
+ entity = timelineDataManager.getEntity(
+ parseStr(entityType),
+ parseStr(entityId),
+ parseFieldsStr(fields, ","),
+ getUser(req));
} catch (IllegalArgumentException e) {
throw new BadRequestException(
"requested invalid field.");
- } catch (IOException e) {
- LOG.error("Error getting entity", e);
- throw new WebApplicationException(e,
- Response.Status.INTERNAL_SERVER_ERROR);
- } catch (YarnException e) {
+ } catch (Exception e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
@@ -275,51 +214,23 @@ public class TimelineWebServices {
@QueryParam("windowEnd") String windowEnd,
@QueryParam("limit") String limit) {
init(res);
- TimelineEvents events = null;
try {
- UserGroupInformation callerUGI = getUser(req);
- events = store.getEntityTimelines(
+ return timelineDataManager.getEvents(
parseStr(entityType),
parseArrayStr(entityId, ","),
- parseLongStr(limit),
+ parseArrayStr(eventType, ","),
parseLongStr(windowStart),
parseLongStr(windowEnd),
- parseArrayStr(eventType, ","));
- if (events != null) {
- Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
- events.getAllEvents().iterator();
- while (eventsItr.hasNext()) {
- TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
- try {
- TimelineEntity entity = store.getEntity(
- eventsOfOneEntity.getEntityId(),
- eventsOfOneEntity.getEntityType(),
- EnumSet.of(Field.PRIMARY_FILTERS));
- // check ACLs
- if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
- eventsItr.remove();
- }
- } catch (Exception e) {
- LOG.error("Error when verifying access for user " + callerUGI
- + " on the events of the timeline entity "
- + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
- eventsOfOneEntity.getEntityType()), e);
- eventsItr.remove();
- }
- }
- }
+ parseLongStr(limit),
+ getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
- if (events == null) {
- return new TimelineEvents();
- }
- return events;
}
/**
@@ -333,9 +244,6 @@ public class TimelineWebServices {
@Context HttpServletResponse res,
TimelineEntities entities) {
init(res);
- if (entities == null) {
- return new TimelinePutResponse();
- }
UserGroupInformation callerUGI = getUser(req);
if (callerUGI == null) {
String msg = "The owner of the posted timeline entities is not set";
@@ -343,76 +251,8 @@ public class TimelineWebServices {
throw new ForbiddenException(msg);
}
try {
- List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
- TimelineEntities entitiesToPut = new TimelineEntities();
- List<TimelinePutResponse.TimelinePutError> errors =
- new ArrayList<TimelinePutResponse.TimelinePutError>();
- for (TimelineEntity entity : entities.getEntities()) {
- EntityIdentifier entityID =
- new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
-
- // check if there is existing entity
- TimelineEntity existingEntity = null;
- try {
- existingEntity =
- store.getEntity(entityID.getId(), entityID.getType(),
- EnumSet.of(Field.PRIMARY_FILTERS));
- if (existingEntity != null
- && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
- throw new YarnException("The timeline entity " + entityID
- + " was not put by " + callerUGI + " before");
- }
- } catch (Exception e) {
- // Skip the entity which already exists and was put by others
- LOG.warn("Skip the timeline entity: " + entityID + ", because "
- + e.getMessage());
- TimelinePutResponse.TimelinePutError error =
- new TimelinePutResponse.TimelinePutError();
- error.setEntityId(entityID.getId());
- error.setEntityType(entityID.getType());
- error.setErrorCode(
- TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
- errors.add(error);
- continue;
- }
-
- // inject owner information for the access check if this is the first
- // time to post the entity, in case it's the admin who is updating
- // the timeline data.
- try {
- if (existingEntity == null) {
- injectOwnerInfo(entity, callerUGI.getShortUserName());
- }
- } catch (YarnException e) {
- // Skip the entity which messes up the primary filter and record the
- // error
- LOG.warn("Skip the timeline entity: " + entityID + ", because "
- + e.getMessage());
- TimelinePutResponse.TimelinePutError error =
- new TimelinePutResponse.TimelinePutError();
- error.setEntityId(entityID.getId());
- error.setEntityType(entityID.getType());
- error.setErrorCode(
- TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
- errors.add(error);
- continue;
- }
-
- entityIDs.add(entityID);
- entitiesToPut.addEntity(entity);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
- + TimelineUtils.dumpTimelineRecordtoJSON(entity));
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
- }
- TimelinePutResponse response = store.put(entitiesToPut);
- // add the errors of timeline system filter key conflict
- response.addErrors(errors);
- return response;
- } catch (IOException e) {
+ return timelineDataManager.postEntities(entities, callerUGI);
+ } catch (Exception e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
@@ -423,6 +263,15 @@ public class TimelineWebServices {
response.setContentType(null);
}
+ private static UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUGI;
+ }
+
private static SortedSet<String> parseArrayStr(String str, String delimiter) {
if (str == null) {
return null;
@@ -495,14 +344,6 @@ public class TimelineWebServices {
}
}
- private static boolean extendFields(EnumSet<Field> fieldEnums) {
- boolean modified = false;
- if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
- fieldEnums.add(Field.PRIMARY_FILTERS);
- modified = true;
- }
- return modified;
- }
private static Long parseLongStr(String str) {
return str == null ? null : Long.parseLong(str.trim());
}
@@ -511,34 +352,4 @@ public class TimelineWebServices {
return str == null ? null : str.trim();
}
- private static UserGroupInformation getUser(HttpServletRequest req) {
- String remoteUser = req.getRemoteUser();
- UserGroupInformation callerUGI = null;
- if (remoteUser != null) {
- callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
- }
- return callerUGI;
- }
-
- private static void injectOwnerInfo(TimelineEntity timelineEntity,
- String owner) throws YarnException {
- if (timelineEntity.getPrimaryFilters() != null &&
- timelineEntity.getPrimaryFilters().containsKey(
- TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
- throw new YarnException(
- "User should not use the timeline system filter key: "
- + TimelineStore.SystemFilter.ENTITY_OWNER);
- }
- timelineEntity.addPrimaryFilter(
- TimelineStore.SystemFilter.ENTITY_OWNER
- .toString(), owner);
- }
-
- private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
- if (timelineEntity.getPrimaryFilters() != null) {
- timelineEntity.getPrimaryFilters().remove(
- TimelineStore.SystemFilter.ENTITY_OWNER.toString());
- }
- }
-
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Wed Aug 20 01:34:29 2014
@@ -69,7 +69,7 @@ public class TestApplicationHistoryClien
historyServer.init(config);
historyServer.start();
store =
- ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory())
+ ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager())
.getHistoryStore();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java Wed Aug 20 01:34:29 2014
@@ -23,11 +23,14 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.fail;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
public class TestApplicationHistoryServer {
@@ -69,6 +72,31 @@ public class TestApplicationHistoryServe
}
}
+ @Test(timeout = 50000)
+ public void testFilteOverrides() throws Exception {
+
+ String[] filterInitializers =
+ {
+ AuthenticationFilterInitializer.class.getName(),
+ TimelineAuthenticationFilterInitializer.class.getName(),
+ AuthenticationFilterInitializer.class.getName() + ","
+ + TimelineAuthenticationFilterInitializer.class.getName(),
+ AuthenticationFilterInitializer.class.getName() + ", "
+ + TimelineAuthenticationFilterInitializer.class.getName() };
+ for (String filterInitializer : filterInitializers) {
+ historyServer = new ApplicationHistoryServer();
+ Configuration config = new YarnConfiguration();
+ config.set("hadoop.http.filter.initializers", filterInitializer);
+ historyServer.init(config);
+ historyServer.start();
+ Configuration tmp = historyServer.getConfig();
+ assertEquals(TimelineAuthenticationFilterInitializer.class.getName(),
+ tmp.get("hadoop.http.filter.initializers"));
+ historyServer.stop();
+ AHSWebApp.resetInstance();
+ }
+ }
+
@After
public void stop() {
if (historyServer != null) {
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java Wed Aug 20 01:34:29 2014
@@ -36,14 +36,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.iq80.leveldb.DBIterator;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,19 +55,19 @@ import org.junit.Test;
public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
+ private Configuration config = new YarnConfiguration();
@Before
public void setup() throws Exception {
fsContext = FileContext.getLocalFSFileContext();
- Configuration conf = new YarnConfiguration();
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
- conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+ config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+ config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
store = new LeveldbTimelineStore();
- store.init(conf);
+ store.init(config);
store.start();
loadTestData();
loadVerificationData();
@@ -263,5 +266,47 @@ public class TestLeveldbTimelineStore ex
assertEquals(1, getEntities("type_2").size());
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
}
+
+ @Test
+ public void testCheckVersion() throws IOException {
+ LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
+ // default version
+ Version defaultVersion = dbStore.getCurrentVersion();
+ Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+ // compatible version
+ Version compatibleVersion =
+ Version.newInstance(defaultVersion.getMajorVersion(),
+ defaultVersion.getMinorVersion() + 2);
+ dbStore.storeVersion(compatibleVersion);
+ Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
+ restartTimelineStore();
+ dbStore = (LeveldbTimelineStore) store;
+ // overwrite the compatible version
+ Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+ // incompatible version
+ Version incompatibleVersion =
+ Version.newInstance(defaultVersion.getMajorVersion() + 1,
+ defaultVersion.getMinorVersion());
+ dbStore.storeVersion(incompatibleVersion);
+ try {
+ restartTimelineStore();
+ Assert.fail("Incompatible version, should expect fail here.");
+ } catch (ServiceStateException e) {
+ Assert.assertTrue("Exception message mismatch",
+ e.getMessage().contains("Incompatible version for timeline store"));
+ }
+ }
+
+ private void restartTimelineStore() throws IOException {
+ // need to close so leveldb releases database lock
+ if (store != null) {
+ store.close();
+ }
+ store = new LeveldbTimelineStore();
+ store.init(config);
+ store.start();
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Wed Aug 20 01:34:29 2014
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.ws.rs.core.MediaType;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager;
import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
@@ -88,14 +90,15 @@ public class TestTimelineWebServices ext
} catch (Exception e) {
Assert.fail();
}
- bind(TimelineStore.class).toInstance(store);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
timelineACLsManager = new TimelineACLsManager(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
adminACLsManager = new AdminACLsManager(conf);
- bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+ TimelineDataManager timelineDataManager =
+ new TimelineDataManager(store, timelineACLsManager);
+ bind(TimelineDataManager.class).toInstance(timelineDataManager);
serve("/*").with(GuiceContainer.class);
TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
FilterConfig filterConfig = mock(FilterConfig.class);
@@ -105,6 +108,8 @@ public class TestTimelineWebServices ext
.thenReturn("simple");
when(filterConfig.getInitParameter(
PseudoAuthenticationHandler.ANONYMOUS_ALLOWED)).thenReturn("true");
+ ServletContext context = mock(ServletContext.class);
+ when(filterConfig.getServletContext()).thenReturn(context);
Enumeration<Object> names = mock(Enumeration.class);
when(names.hasMoreElements()).thenReturn(true, true, false);
when(names.nextElement()).thenReturn(
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Wed Aug 20 01:34:29 2014
@@ -47,4 +47,10 @@ message NodeHealthStatusProto {
optional bool is_node_healthy = 1;
optional string health_report = 2;
optional int64 last_health_report_time = 3;
-}
\ No newline at end of file
+}
+
+message VersionProto {
+ optional int32 major_version = 1;
+ optional int32 minor_version = 2;
+}
+
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Wed Aug 20 01:34:29 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -29,17 +30,18 @@ import java.util.concurrent.locks.Reentr
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -126,9 +128,76 @@ public abstract class ContainerExecutor
public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
throws IOException, InterruptedException;
+ public abstract boolean isContainerProcessAlive(String user, String pid)
+ throws IOException;
+
+ /**
+ * Recover an already existing container. This is a blocking call and returns
+ * only when the container exits. Note that the container must have been
+ * activated prior to this call.
+ * @param user the user of the container
+ * @param containerId The ID of the container to reacquire
+ * @return The exit code of the pre-existing container
+ * @throws IOException
+ */
+ public int reacquireContainer(String user, ContainerId containerId)
+ throws IOException {
+ Path pidPath = getPidFilePath(containerId);
+ if (pidPath == null) {
+ LOG.warn(containerId + " is not active, returning terminated error");
+ return ExitCode.TERMINATED.getExitCode();
+ }
+
+ String pid = null;
+ pid = ProcessIdFileReader.getProcessId(pidPath);
+ if (pid == null) {
+ throw new IOException("Unable to determine pid for " + containerId);
+ }
+
+ LOG.info("Reacquiring " + containerId + " with pid " + pid);
+ try {
+ while(isContainerProcessAlive(user, pid)) {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while waiting for process " + pid
+ + " to exit", e);
+ }
+
+ // wait for exit code file to appear
+ String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
+ File file = new File(exitCodeFile);
+ final int sleepMsec = 100;
+ int msecLeft = 2000;
+ while (!file.exists() && msecLeft >= 0) {
+ if (!isContainerActive(containerId)) {
+ LOG.info(containerId + " was deactivated");
+ return ExitCode.TERMINATED.getExitCode();
+ }
+ try {
+ Thread.sleep(sleepMsec);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Interrupted while waiting for exit code from " + containerId, e);
+ }
+ msecLeft -= sleepMsec;
+ }
+ if (msecLeft < 0) {
+ throw new IOException("Timeout while waiting for exit code from "
+ + containerId);
+ }
+
+ try {
+ return Integer.parseInt(FileUtils.readFileToString(file).trim());
+ } catch (NumberFormatException e) {
+ throw new IOException("Error parsing exit code from pid " + pid, e);
+ }
+ }
+
public enum ExitCode {
FORCE_KILLED(137),
- TERMINATED(143);
+ TERMINATED(143),
+ LOST(154);
private final int code;
private ExitCode(int exitCode) {
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Wed Aug 20 01:34:29 2014
@@ -273,25 +273,57 @@ public class DefaultContainerExecutor ex
private final class UnixLocalWrapperScriptBuilder
extends LocalWrapperScriptBuilder {
+ private final Path sessionScriptPath;
public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
super(containerWorkDir);
+ this.sessionScriptPath = new Path(containerWorkDir,
+ Shell.appendScriptExtension("default_container_executor_session"));
+ }
+
+ @Override
+ public void writeLocalWrapperScript(Path launchDst, Path pidFile)
+ throws IOException {
+ writeSessionScript(launchDst, pidFile);
+ super.writeLocalWrapperScript(launchDst, pidFile);
}
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout) {
-
- // We need to do a move as writing to a file is not atomic
- // Process reading a file being written to may get garbled data
- // hence write pid to tmp file first followed by a mv
+ String exitCodeFile = ContainerLaunch.getExitCodeFile(
+ pidFile.toString());
+ String tmpFile = exitCodeFile + ".tmp";
pout.println("#!/bin/bash");
- pout.println();
- pout.println("echo $$ > " + pidFile.toString() + ".tmp");
- pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
- String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
- pout.println(exec + " /bin/bash \"" +
- launchDst.toUri().getPath().toString() + "\"");
+ pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
+ pout.println("rc=$?");
+ pout.println("echo $rc > \"" + tmpFile + "\"");
+ pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
+ pout.println("exit $rc");
+ }
+
+ private void writeSessionScript(Path launchDst, Path pidFile)
+ throws IOException {
+ DataOutputStream out = null;
+ PrintStream pout = null;
+ try {
+ out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
+ pout = new PrintStream(out);
+ // We need to do a move as writing to a file is not atomic
+ // Process reading a file being written to may get garbled data
+ // hence write pid to tmp file first followed by a mv
+ pout.println("#!/bin/bash");
+ pout.println();
+ pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+ pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+ String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
+ pout.println(exec + " /bin/bash \"" +
+ launchDst.toUri().getPath().toString() + "\"");
+ } finally {
+ IOUtils.cleanup(LOG, pout, out);
+ }
+ lfs.setPermission(sessionScriptPath,
+ ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
}
}
@@ -310,6 +342,7 @@ public class DefaultContainerExecutor ex
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout) {
+ // TODO: exit code script for Windows
// On Windows, the pid is the container ID, so that it can also serve as
// the name of the job object created by winutils for task management.
@@ -342,6 +375,12 @@ public class DefaultContainerExecutor ex
return true;
}
+ @Override
+ public boolean isContainerProcessAlive(String user, String pid)
+ throws IOException {
+ return containerIsAlive(pid);
+ }
+
/**
* Returns true if the process with the specified pid is alive.
*
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Wed Aug 20 01:34:29 2014
@@ -403,6 +403,13 @@ public class LinuxContainerExecutor exte
}
}
+ @Override
+ public boolean isContainerProcessAlive(String user, String pid)
+ throws IOException {
+ // Send a test signal to the process as the user to see if it's alive
+ return signalContainer(user, pid, Signal.NULL);
+ }
+
public void mountCgroups(List<String> cgroupKVs, String hierarchy)
throws IOException {
List<String> command = new ArrayList<String>(
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Wed Aug 20 01:34:29 2014
@@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record
public interface NodeStatusUpdater extends Service {
+ /**
+ * Schedule a heartbeat to the ResourceManager outside of the normal,
+ * periodic heartbeating process. This is typically called when the state
+ * of containers on the node has changed to notify the RM sooner.
+ */
void sendOutofBandHeartBeat();
+ /**
+ * Get the ResourceManager identifier received during registration
+ * @return the ResourceManager ID
+ */
long getRMIdentifier();
+ /**
+ * Query if a container has recently completed
+ * @param containerId the container ID
+ * @return true if the container has recently completed
+ */
public boolean isContainerRecentlyStopped(ContainerId containerId);
+ /**
+ * Add a container to the list of containers that have recently completed
+ * @param containerId the ID of the completed container
+ */
+ public void addCompletedContainer(ContainerId containerId);
+
+ /**
+ * Clear the list of recently completed containers
+ */
public void clearFinishedContainersFromCache();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Aug 20 01:34:29 2014
@@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
- updateStoppedContainersInCache(container.getContainerId());
- addCompletedContainer(container);
+ addCompletedContainer(container.getContainerId());
}
}
if (LOG.isDebugEnabled()) {
@@ -393,8 +392,7 @@ public class NodeStatusUpdaterImpl exten
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
- updateStoppedContainersInCache(container.getContainerId());
- addCompletedContainer(container);
+ addCompletedContainer(container.getContainerId());
}
}
LOG.info("Sending out " + containerStatuses.size()
@@ -402,9 +400,15 @@ public class NodeStatusUpdaterImpl exten
return containerStatuses;
}
- private void addCompletedContainer(Container container) {
+ @Override
+ public void addCompletedContainer(ContainerId containerId) {
synchronized (previousCompletedContainers) {
- previousCompletedContainers.add(container.getContainerId());
+ previousCompletedContainers.add(containerId);
+ }
+ synchronized (recentlyStoppedContainers) {
+ removeVeryOldStoppedContainersFromCache();
+ recentlyStoppedContainers.put(containerId,
+ System.currentTimeMillis() + durationToTrackStoppedContainers);
}
}
@@ -451,16 +455,6 @@ public class NodeStatusUpdaterImpl exten
}
}
- @Private
- @VisibleForTesting
- public void updateStoppedContainersInCache(ContainerId containerId) {
- synchronized (recentlyStoppedContainers) {
- removeVeryOldStoppedContainersFromCache();
- recentlyStoppedContainers.put(containerId,
- System.currentTimeMillis() + durationToTrackStoppedContainers);
- }
- }
-
@Override
public void clearFinishedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
@@ -476,8 +470,14 @@ public class NodeStatusUpdaterImpl exten
Iterator<ContainerId> i =
recentlyStoppedContainers.keySet().iterator();
while (i.hasNext()) {
- if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+ ContainerId cid = i.next();
+ if (recentlyStoppedContainers.get(cid) < currentTime) {
i.remove();
+ try {
+ context.getNMStateStore().removeContainer(cid);
+ } catch (IOException e) {
+ LOG.error("Unable to remove container " + cid + " in store", e);
+ }
} else {
break;
}