You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2014/10/08 21:54:50 UTC
[3/6] YARN-913 service registry: YARN-2652 add hadoop-yarn-registry
package under hadoop-yarn
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java
new file mode 100644
index 0000000..004be86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.integration;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.registry.server.services.RegistryAdminService;
+
+/**
+ * Select an entry by the YARN persistence policy
+ */
+public class SelectByYarnPersistence
+ implements RegistryAdminService.NodeSelector {
+ private final String id;
+ private final String targetPolicy;
+
+ public SelectByYarnPersistence(String id, String targetPolicy) {
+ Preconditions.checkArgument(!StringUtils.isEmpty(id), "id");
+ Preconditions.checkArgument(!StringUtils.isEmpty(targetPolicy),
+ "targetPolicy");
+ this.id = id;
+ this.targetPolicy = targetPolicy;
+ }
+
+ @Override
+ public boolean shouldSelect(String path,
+ RegistryPathStatus registryPathStatus,
+ ServiceRecord serviceRecord) {
+ String policy =
+ serviceRecord.get(YarnRegistryAttributes.YARN_PERSISTENCE, "");
+ return id.equals(serviceRecord.get(YarnRegistryAttributes.YARN_ID, ""))
+ && (targetPolicy.equals(policy));
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "Select by ID %s and policy %s: {}",
+ id, targetPolicy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java
new file mode 100644
index 0000000..22d8bc5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the classes which integrate with the YARN resource
+ * manager.
+ */
+package org.apache.hadoop.registry.server.integration;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java
new file mode 100644
index 0000000..6962eb8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Server-side classes for the registry
+ * <p>
+ * These are components intended to be deployed only on servers or in test
+ * JVMs, rather than on client machines.
+ * <p>
+ * Example components are: server-side ZK support, a REST service, etc.
+ */
+package org.apache.hadoop.registry.server;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java
new file mode 100644
index 0000000..9faede4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+
+/**
+ * Composite service that exports the add/remove methods.
+ * <p>
+ * This allows external classes to add services to these methods, after which
+ * they follow the same lifecyce.
+ * <p>
+ * It is essential that any service added is in a state where it can be moved
+ * on with that of the parent services. Specifically, do not add an uninited
+ * service to a parent that is already inited —as the <code>start</code>
+ * operation will then fail
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AddingCompositeService extends CompositeService {
+
+
+ public AddingCompositeService(String name) {
+ super(name);
+ }
+
+ @Override
+ public void addService(Service service) {
+ super.addService(service);
+ }
+
+ @Override
+ public boolean removeService(Service service) {
+ return super.removeService(service);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
new file mode 100644
index 0000000..e160d4a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Curator callback for delete operations completing.
+ * <p>
+ * This callback logs at debug and increments the event counter.
+ */
+public class DeleteCompletionCallback implements BackgroundCallback {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RMRegistryOperationsService.class);
+
+ private AtomicInteger events = new AtomicInteger(0);
+
+ @Override
+ public void processResult(CuratorFramework client,
+ CuratorEvent event) throws
+ Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delete event {}", event);
+ }
+ events.incrementAndGet();
+ }
+
+ /**
+ * Get the number of deletion events
+ * @return the count of events
+ */
+ public int getEventCount() {
+ return events.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
new file mode 100644
index 0000000..3fa0c19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.impl.zk.BindingInformation;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
+import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
+import org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+/**
+ * This is a small, localhost Zookeeper service instance that is contained
+ * in a YARN service...it's been derived from Apache Twill.
+ *
+ * It implements {@link RegistryBindingSource} and provides binding information,
+ * <i>once started</i>. Until <code>start()</code> is called, the hostname &
+ * port may be undefined. Accordingly, the service raises an exception in this
+ * condition.
+ *
+ * If you wish to chain together a registry service with this one under
+ * the same <code>CompositeService</code>, this service must be added
+ * as a child first.
+ *
+ * It also sets the configuration parameter
+ * {@link RegistryConstants#KEY_REGISTRY_ZK_QUORUM}
+ * to its connection string. Any code with access to the service configuration
+ * can view it.
+ */
+@InterfaceStability.Evolving
+public class MicroZookeeperService
+ extends AbstractService
+ implements RegistryBindingSource, RegistryConstants,
+ ZookeeperConfigOptions,
+ MicroZookeeperServiceKeys{
+
+
+ private static final Logger
+ LOG = LoggerFactory.getLogger(MicroZookeeperService.class);
+
+ private File instanceDir;
+ private File dataDir;
+ private int tickTime;
+ private int port;
+ private String host;
+ private boolean secureServer;
+
+ private ServerCnxnFactory factory;
+ private BindingInformation binding;
+ private File confDir;
+ private StringBuilder diagnostics = new StringBuilder();
+
+ /**
+ * Create an instance
+ * @param name service name
+ */
+ public MicroZookeeperService(String name) {
+ super(name);
+ }
+
+ /**
+ * Get the connection string.
+ * @return the string
+ * @throws IllegalStateException if the connection is not yet valid
+ */
+ public String getConnectionString() {
+ Preconditions.checkState(factory != null, "service not started");
+ InetSocketAddress addr = factory.getLocalAddress();
+ return String.format("%s:%d", addr.getHostName(), addr.getPort());
+ }
+
+ /**
+ * Get the connection address
+ * @return the connection as an address
+ * @throws IllegalStateException if the connection is not yet valid
+ */
+ public InetSocketAddress getConnectionAddress() {
+ Preconditions.checkState(factory != null, "service not started");
+ return factory.getLocalAddress();
+ }
+
+ /**
+ * Create an inet socket addr from the local host + port number
+ * @param port port to use
+ * @return a (hostname, port) pair
+ * @throws UnknownHostException if the server cannot resolve the host
+ */
+ private InetSocketAddress getAddress(int port) throws UnknownHostException {
+ return new InetSocketAddress(host, port < 0 ? 0 : port);
+ }
+
+ /**
+ * Initialize the service, including choosing a path for the data
+ * @param conf configuration
+ * @throws Exception
+ */
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ port = conf.getInt(KEY_ZKSERVICE_PORT, 0);
+ tickTime = conf.getInt(KEY_ZKSERVICE_TICK_TIME,
+ ZooKeeperServer.DEFAULT_TICK_TIME);
+ String instancedirname = conf.getTrimmed(
+ KEY_ZKSERVICE_DIR, "");
+ host = conf.getTrimmed(KEY_ZKSERVICE_HOST, DEFAULT_ZKSERVICE_HOST);
+ if (instancedirname.isEmpty()) {
+ File testdir = new File(System.getProperty("test.dir", "target"));
+ instanceDir = new File(testdir, "zookeeper" + getName());
+ } else {
+ instanceDir = new File(instancedirname);
+ FileUtil.fullyDelete(instanceDir);
+ }
+ LOG.debug("Instance directory is {}", instanceDir);
+ mkdirStrict(instanceDir);
+ dataDir = new File(instanceDir, "data");
+ confDir = new File(instanceDir, "conf");
+ mkdirStrict(dataDir);
+ mkdirStrict(confDir);
+ super.serviceInit(conf);
+ }
+
+ /**
+ * Create a directory, ignoring if the dir is already there,
+ * and failing if a file or something else was at the end of that
+ * path
+ * @param dir dir to guarantee the existence of
+ * @throws IOException IO problems, or path exists but is not a dir
+ */
+ private void mkdirStrict(File dir) throws IOException {
+ if (!dir.mkdirs()) {
+ if (!dir.isDirectory()) {
+ throw new IOException("Failed to mkdir " + dir);
+ }
+ }
+ }
+
+ /**
+ * Append a formatted string to the diagnostics.
+ * <p>
+ * A newline is appended afterwards.
+ * @param text text including any format commands
+ * @param args arguments for the forma operation.
+ */
+ protected void addDiagnostics(String text, Object ... args) {
+ diagnostics.append(String.format(text, args)).append('\n');
+ }
+
+ /**
+ * Get the diagnostics info
+ * @return the diagnostics string built up
+ */
+ public String getDiagnostics() {
+ return diagnostics.toString();
+ }
+
+ /**
+ * set up security. this must be done prior to creating
+ * the ZK instance, as it sets up JAAS if that has not been done already.
+ *
+ * @return true if the cluster has security enabled.
+ */
+ public boolean setupSecurity() throws IOException {
+ Configuration conf = getConfig();
+ String jaasContext = conf.getTrimmed(KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT);
+ secureServer = StringUtils.isNotEmpty(jaasContext);
+ if (secureServer) {
+ RegistrySecurity.validateContext(jaasContext);
+ RegistrySecurity.bindZKToServerJAASContext(jaasContext);
+ // policy on failed auth
+ System.setProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS,
+ conf.get(KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS,
+ "true"));
+
+ //needed so that you can use sasl: strings in the registry
+ System.setProperty(RegistryInternalConstants.ZOOKEEPER_AUTH_PROVIDER +".1",
+ RegistryInternalConstants.SASLAUTHENTICATION_PROVIDER);
+ String serverContext =
+ System.getProperty(PROP_ZK_SERVER_SASL_CONTEXT);
+ addDiagnostics("Server JAAS context s = %s", serverContext);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Startup: start ZK. It is only after this that
+ * the binding information is valid.
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStart() throws Exception {
+
+ setupSecurity();
+
+ ZooKeeperServer zkServer = new ZooKeeperServer();
+ FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
+ zkServer.setTxnLogFactory(ftxn);
+ zkServer.setTickTime(tickTime);
+
+ LOG.info("Starting Local Zookeeper service");
+ factory = ServerCnxnFactory.createFactory();
+ factory.configure(getAddress(port), -1);
+ factory.startup(zkServer);
+
+ String connectString = getConnectionString();
+ LOG.info("In memory ZK started at {}\n", connectString);
+
+ if (LOG.isDebugEnabled()) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ zkServer.dumpConf(pw);
+ pw.flush();
+ LOG.debug(sw.toString());
+ }
+ binding = new BindingInformation();
+ binding.ensembleProvider = new FixedEnsembleProvider(connectString);
+ binding.description =
+ getName() + " reachable at \"" + connectString + "\"";
+
+ addDiagnostics(binding.description);
+ // finally: set the binding information in the config
+ getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString);
+ }
+
+ /**
+ * When the service is stopped, it deletes the data directory
+ * and its contents
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ if (factory != null) {
+ factory.shutdown();
+ factory = null;
+ }
+ if (dataDir != null) {
+ FileUtil.fullyDelete(dataDir);
+ }
+ }
+
+ @Override
+ public BindingInformation supplyBindingInformation() {
+ Preconditions.checkNotNull(binding,
+ "Service is not started: binding information undefined");
+ return binding;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java
new file mode 100644
index 0000000..f4f4976
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+
+/**
+ * Service keys for configuring the {@link MicroZookeeperService}.
+ * These are not used in registry clients or the RM-side service,
+ * so are kept separate.
+ */
+public interface MicroZookeeperServiceKeys {
+ public static final String ZKSERVICE_PREFIX =
+ RegistryConstants.REGISTRY_PREFIX + "zk.service.";
+ /**
+ * Key to define the JAAS context for the ZK service: {@value}.
+ */
+ public static final String KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT =
+ ZKSERVICE_PREFIX + "service.jaas.context";
+
+ /**
+ * ZK servertick time: {@value}
+ */
+ public static final String KEY_ZKSERVICE_TICK_TIME =
+ ZKSERVICE_PREFIX + "ticktime";
+
+ /**
+ * host to register on: {@value}.
+ */
+ public static final String KEY_ZKSERVICE_HOST = ZKSERVICE_PREFIX + "host";
+ /**
+ * Default host to serve on -this is <code>localhost</code> as it
+ * is the only one guaranteed to be available: {@value}.
+ */
+ public static final String DEFAULT_ZKSERVICE_HOST = "localhost";
+ /**
+ * port; 0 or below means "any": {@value}
+ */
+ public static final String KEY_ZKSERVICE_PORT = ZKSERVICE_PREFIX + "port";
+
+ /**
+ * Directory containing data: {@value}
+ */
+ public static final String KEY_ZKSERVICE_DIR = ZKSERVICE_PREFIX + "dir";
+
+ /**
+ * Should failed SASL clients be allowed: {@value}?
+ *
+ * Default is the ZK default: true
+ */
+ public static final String KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS =
+ ZKSERVICE_PREFIX + "allow.failed.sasl.clients";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
new file mode 100644
index 0000000..693bb0b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Administrator service for the registry. This is the one with
+ * permissions to create the base directories and those for users.
+ *
+ * It also includes support for asynchronous operations, so that
+ * zookeeper connectivity problems do not hold up the server code
+ * performing the actions.
+ *
+ * Any action queued via {@link #submit(Callable)} will be
+ * run asynchronously. The {@link #createDirAsync(String, List, boolean)}
+ * is an example of such an an action
+ *
+ * A key async action is the depth-first tree purge, which supports
+ * pluggable policies for deleting entries. The method
+ * {@link #purge(String, NodeSelector, PurgePolicy, BackgroundCallback)}
+ * implements the recursive purge operation —the class
+ * {{AsyncPurge}} provides the asynchronous scheduling of this.
+ */
+public class RegistryAdminService extends RegistryOperationsService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RegistryAdminService.class);
+ /**
+ * The ACL permissions for the user's homedir ACL.
+ */
+ public static final int USER_HOMEDIR_ACL_PERMISSIONS =
+ ZooDefs.Perms.READ | ZooDefs.Perms.WRITE
+ | ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
+
+ /**
+ * Executor for async operations
+ */
+ protected final ExecutorService executor;
+
+ /**
+ * Construct an instance of the service
+ * @param name service name
+ */
+ public RegistryAdminService(String name) {
+ this(name, null);
+ }
+
+ /**
+ * construct an instance of the service, using the
+ * specified binding source to bond to ZK
+ * @param name service name
+ * @param bindingSource provider of ZK binding information
+ */
+ public RegistryAdminService(String name,
+ RegistryBindingSource bindingSource) {
+ super(name, bindingSource);
+ executor = Executors.newCachedThreadPool(
+ new ThreadFactory() {
+ private AtomicInteger counter = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r,
+ "RegistryAdminService " + counter.getAndIncrement());
+ }
+ });
+ }
+
+ /**
+ * Stop the service: halt the executor.
+ * @throws Exception exception.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ stopExecutor();
+ super.serviceStop();
+ }
+
+ /**
+ * Stop the executor if it is not null.
+ * This uses {@link ExecutorService#shutdownNow()}
+ * and so does not block until they have completed.
+ */
+ protected synchronized void stopExecutor() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
+ /**
+ * Get the executor
+ * @return the executor
+ */
+ protected ExecutorService getExecutor() {
+ return executor;
+ }
+
+ /**
+ * Submit a callable
+ * @param callable callable
+ * @param <V> type of the final get
+ * @return a future to wait on
+ */
+ public <V> Future<V> submit(Callable<V> callable) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submitting {}", callable);
+ }
+ return getExecutor().submit(callable);
+ }
+
+ /**
+ * Asynchronous operation to create a directory
+ * @param path path
+ * @param acls ACL list
+ * @param createParents flag to indicate parent dirs should be created
+ * as needed
+ * @return the future which will indicate whether or not the operation
+ * succeeded —and propagate any exceptions
+ * @throws IOException
+ */
+ public Future<Boolean> createDirAsync(final String path,
+ final List<ACL> acls,
+ final boolean createParents) throws IOException {
+ return submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return maybeCreate(path, CreateMode.PERSISTENT,
+ acls, createParents);
+ }
+ });
+ }
+
+ /**
+ * Init operation sets up the system ACLs.
+ * @param conf configuration of the service
+ * @throws Exception
+ */
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ RegistrySecurity registrySecurity = getRegistrySecurity();
+ if (registrySecurity.isSecureRegistry()) {
+ ACL sasl = registrySecurity.createSaslACLFromCurrentUser(ZooDefs.Perms.ALL);
+ registrySecurity.addSystemACL(sasl);
+ LOG.info("Registry System ACLs:",
+ RegistrySecurity.aclsToString(
+ registrySecurity.getSystemACLs()));
+ }
+ }
+
+ /**
+ * Start the service, including creating base directories with permissions
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ // create the root directories
+ try {
+ createRootRegistryPaths();
+ } catch (NoPathPermissionsException e) {
+
+ String message = String.format(Locale.ENGLISH,
+ "Failed to create root paths {%s};" +
+ "\ndiagnostics={%s}" +
+ "\ncurrent registry is:" +
+ "\n{%s}",
+ e,
+ bindingDiagnosticDetails(),
+ dumpRegistryRobustly(true));
+
+ LOG.error(" Failure {}", e, e);
+ LOG.error(message);
+
+ // TODO: this is something temporary to deal with the problem
+ // that jenkins is failing this test
+ throw new NoPathPermissionsException(e.getPath().toString(), message, e);
+ }
+ }
+
+ /**
+ * Create the initial registry paths
+ * @throws IOException any failure
+ */
+ @VisibleForTesting
+ public void createRootRegistryPaths() throws IOException {
+
+ List<ACL> systemACLs = getRegistrySecurity().getSystemACLs();
+ LOG.info("System ACLs {}",
+ RegistrySecurity.aclsToString(systemACLs));
+ maybeCreate("", CreateMode.PERSISTENT, systemACLs, false);
+ maybeCreate(PATH_USERS, CreateMode.PERSISTENT,
+ systemACLs, false);
+ maybeCreate(PATH_SYSTEM_SERVICES,
+ CreateMode.PERSISTENT,
+ systemACLs, false);
+ }
+
+ /**
+ * Get the path to a user's home dir
+ * @param username username
+ * @return a path for services underneath
+ */
+ protected String homeDir(String username) {
+ return RegistryUtils.homePathForUser(username);
+ }
+
+ /**
+ * Set up the ACL for the user.
+ * <b>Important: this must run client-side as it needs
+ * to know the id:pass tuple for a user</b>
+ * @param username user name
+ * @param perms permissions
+ * @return an ACL list
+ * @throws IOException ACL creation/parsing problems
+ */
+ public List<ACL> aclsForUser(String username, int perms) throws IOException {
+ List<ACL> clientACLs = getClientAcls();
+ RegistrySecurity security = getRegistrySecurity();
+ if (security.isSecureRegistry()) {
+ clientACLs.add(security.createACLfromUsername(username, perms));
+ }
+ return clientACLs;
+ }
+
+ /**
+ * Start an async operation to create the home path for a user
+ * if it does not exist
+ * @param shortname username, without any @REALM in kerberos
+ * @return the path created
+ * @throws IOException any failure while setting up the operation
+ *
+ */
+ public Future<Boolean> initUserRegistryAsync(final String shortname)
+ throws IOException {
+
+ String homeDir = homeDir(shortname);
+ if (!exists(homeDir)) {
+ // create the directory. The user does not
+ return createDirAsync(homeDir,
+ aclsForUser(shortname,
+ USER_HOMEDIR_ACL_PERMISSIONS),
+ false);
+ }
+ return null;
+ }
+
+ /**
+ * Create the home path for a user if it does not exist.
+ *
+ * This uses {@link #initUserRegistryAsync(String)} and then waits for the
+ * result ... the code path is the same as the async operation; this just
+ * picks up and relays/converts exceptions
+ * @param username username
+ * @return the path created
+ * @throws IOException any failure
+ *
+ */
+ public String initUserRegistry(final String username)
+ throws IOException {
+
+ try {
+ Future<Boolean> future = initUserRegistryAsync(username);
+ future.get();
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)
+ (new InterruptedIOException(e.toString()).initCause(e));
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) (cause);
+ } else {
+ throw new IOException(cause.toString(), cause);
+ }
+ }
+
+ return homeDir(username);
+ }
+
+ /**
+ * Method to validate the validity of the kerberos realm.
+ * <ul>
+ * <li>Insecure: not needed.</li>
+ * <li>Secure: must have been determined.</li>
+ * </ul>
+ */
+ protected void verifyRealmValidity() throws ServiceStateException {
+ if (isSecure()) {
+ String realm = getRegistrySecurity().getKerberosRealm();
+ if (StringUtils.isEmpty(realm)) {
+ throw new ServiceStateException("Cannot determine service realm");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Started Registry operations in realm {}", realm);
+ }
+ }
+ }
+
+ /**
+ * Policy to purge entries
+ */
+ public enum PurgePolicy {
+ PurgeAll,
+ FailOnChildren,
+ SkipOnChildren
+ }
+
+ /**
+ * Recursive operation to purge all matching records under a base path.
+ * <ol>
+ * <li>Uses a depth first search</li>
+ * <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
+ * <li>If a record matches then it is deleted without any child searches</li>
+ * <li>Deletions will be asynchronous if a callback is provided</li>
+ * </ol>
+ *
+ * The code is designed to be robust against parallel deletions taking place;
+ * in such a case it will stop attempting that part of the tree. This
+ * avoid the situation of more than 1 purge happening in parallel and
+ * one of the purge operations deleteing the node tree above the other.
+ * @param path base path
+ * @param selector selector for the purge policy
+ * @param purgePolicy what to do if there is a matching record with children
+ * @param callback optional curator callback
+ * @return the number of delete operations perfomed. As deletes may be for
+ * everything under a path, this may be less than the number of records
+ * actually deleted
+ * @throws IOException problems
+ * @throws PathIsNotEmptyDirectoryException if an entry cannot be deleted
+ * as it has children and the purge policy is FailOnChildren
+ */
+ @VisibleForTesting
+ public int purge(String path,
+ NodeSelector selector,
+ PurgePolicy purgePolicy,
+ BackgroundCallback callback) throws IOException {
+
+
+ boolean toDelete = false;
+ // look at self to see if it has a service record
+ Map<String, RegistryPathStatus> childEntries;
+ Collection<RegistryPathStatus> entries;
+ try {
+ // list this path's children
+ childEntries = RegistryUtils.statChildren(this, path);
+ entries = childEntries.values();
+ } catch (PathNotFoundException e) {
+ // there's no record here, it may have been deleted already.
+ // exit
+ return 0;
+ }
+
+ try {
+ RegistryPathStatus registryPathStatus = stat(path);
+ ServiceRecord serviceRecord = resolve(path);
+ // there is now an entry here.
+ toDelete = selector.shouldSelect(path, registryPathStatus, serviceRecord);
+ } catch (EOFException ignored) {
+ // ignore
+ } catch (InvalidRecordException ignored) {
+ // ignore
+ } catch (NoRecordException ignored) {
+ // ignore
+ } catch (PathNotFoundException e) {
+ // there's no record here, it may have been deleted already.
+ // exit
+ return 0;
+ }
+
+ if (toDelete && !entries.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Match on record @ {} with children ", path);
+ }
+ // there's children
+ switch (purgePolicy) {
+ case SkipOnChildren:
+ // don't do the deletion... continue to next record
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping deletion");
+ }
+ toDelete = false;
+ break;
+ case PurgeAll:
+ // mark for deletion
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling for deletion with children");
+ }
+ toDelete = true;
+ entries = new ArrayList<RegistryPathStatus>(0);
+ break;
+ case FailOnChildren:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failing deletion operation");
+ }
+ throw new PathIsNotEmptyDirectoryException(path);
+ }
+ }
+
+ int deleteOps = 0;
+ if (toDelete) {
+ try {
+ zkDelete(path, true, callback);
+ } catch (PathNotFoundException e) {
+ // sign that the path was deleted during the operation.
+ // this is a no-op, and all children can be skipped
+ return deleteOps;
+ }
+ deleteOps++;
+ }
+
+ // now go through the children
+ for (RegistryPathStatus status : entries) {
+ String childname = status.path;
+ String childpath = RegistryPathUtils.join(path, childname);
+ deleteOps += purge(childpath,
+ selector,
+ purgePolicy,
+ callback);
+ }
+
+ return deleteOps;
+ }
+
+ /**
+ * Comparator used for purge logic
+ */
+ public interface NodeSelector {
+
+ boolean shouldSelect(String path,
+ RegistryPathStatus registryPathStatus,
+ ServiceRecord serviceRecord);
+ }
+
+ /**
+ * An async registry purge action taking
+ * a selector which decides what to delete
+ */
+ public class AsyncPurge implements Callable<Integer> {
+
+ private final BackgroundCallback callback;
+ private final NodeSelector selector;
+ private final String path;
+ private final PurgePolicy purgePolicy;
+
+ public AsyncPurge(String path,
+ NodeSelector selector,
+ PurgePolicy purgePolicy,
+ BackgroundCallback callback) {
+ this.callback = callback;
+ this.selector = selector;
+ this.path = path;
+ this.purgePolicy = purgePolicy;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing {}", this);
+ }
+ return purge(path,
+ selector,
+ purgePolicy,
+ callback);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "Record purge under %s with selector %s",
+ path, selector);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java
new file mode 100644
index 0000000..85d24b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Basic services for the YARN registry
+ * <ul>
+ * <li>The {@link org.apache.hadoop.registry.server.services.RegistryAdminService}</ol>
+ * extends the shared Yarn Registry client with registry setup and
+ * (potentially asynchronous) administrative actions.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.registry.server.services.MicroZookeeperService}
+ * is a transient Zookeeper instance bound to the YARN service lifecycle.
+ * It is suitable for testing.
+ * </li>
+ * <li>
+ * The {@link org.apache.hadoop.registry.server.services.AddingCompositeService}
+ * extends the standard YARN composite service by making its add and remove
+ * methods public. It is a utility service used in parts of the codebase
+ * </li>
+ *
+ * </ul>
+ *
+ */
+package org.apache.hadoop.registry.server.services;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
new file mode 100644
index 0000000..1c19ade
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
@@ -0,0 +1,538 @@
+---------------------------- MODULE yarnregistry ----------------------------
+
+EXTENDS FiniteSets, Sequences, Naturals, TLC
+
+
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *)
+
+(*
+
+============================================================================
+
+This defines the YARN registry in terms of operations on sets of records.
+
+Every registry entry is represented as a record containing both the path and the data.
+
+It assumes that
+
+1. operations on this set are immediate.
+2. selection operations (such as \A and \E are atomic)
+3. changes are immediately visible to all other users of the registry.
+4. This clearly implies that changes are visible in the sequence in which they happen.
+
+A multi-server Zookeeper-based registry may not meet all those assumptions
+
+1. changes may take time to propagate across the ZK quorum, hence changes cannot
+be considered immediate from the perspective of other registry clients.
+(assumptions (1) and (3)).
+
+2. Selection operations may not be atomic. (assumption (2)).
+
+Operations will still happen in the order received by the elected ZK master
+
+A stricter definition would try to state that all operations are eventually
+true excluding other changes happening during a sequence of action.
+This is left as an excercise for the reader.
+
+The specification also omits all coverage of the permissions policy.
+*)
+
+
+
+CONSTANTS
+ PathChars, \* the set of valid characters in a path
+ Paths, \* the set of all possible valid paths
+ Data, \* the set of all possible sequences of bytes
+ Address, \* the set of all possible address n-tuples
+ Addresses, \* the set of all possible address instances
+ Endpoints , \* the set of all possible endpoints
+ PersistPolicies,\* the set of persistence policies
+ ServiceRecords, \* all service records
+ Registries, \* the set of all possile registries
+ BindActions, \* all possible put actions
+ DeleteActions, \* all possible delete actions
+ PurgeActions, \* all possible purge actions
+ MknodeActions \* all possible mkdir actions
+
+
+
+(* the registry*)
+VARIABLE registry
+
+(* Sequence of actions to apply to the registry *)
+VARIABLE actions
+
+----------------------------------------------------------------------------------------
+(* Tuple of all variables. *)
+
+
+vars == << registry, actions >>
+
+
+----------------------------------------------------------------------------------------
+
+
+
+
+(* Persistence policy *)
+PersistPolicySet == {
+ "", \* Undefined; field not present. PERMANENT is implied.
+ "permanent", \* persists until explicitly removed
+ "application", \* persists until the application finishes
+ "application-attempt", \* persists until the application attempt finishes
+ "container" \* persists until the container finishes
+ }
+
+(* Type invariants. *)
+TypeInvariant ==
+ /\ \A p \in PersistPolicies: p \in PersistPolicySet
+
+
+
+----------------------------------------------------------------------------------------
+
+
+
+(*
+
+An Entry is defined as a path, and the actual
+data which it contains.
+
+By including the path in an entry, we avoid having to define some
+function mapping Path -> entry. Instead a registry can be defined as a
+set of RegistryEntries matching the validity critera.
+
+*)
+
+RegistryEntry == [
+ \* The path to the entry
+ path: Paths,
+
+ \* the data in the entry
+ data: Data
+ ]
+
+
+(*
+ An endpoint in a service record
+*)
+Endpoint == [
+ \* API of the endpoint: some identifier
+ api: STRING,
+
+ \* A list of address n-tuples
+ addresses: Addresses
+]
+
+(* Attributes are the set of all string to string mappings *)
+
+Attributes == [
+STRING |-> STRING
+]
+
+(*
+ A service record
+*)
+ServiceRecord == [
+ \* ID -used when applying the persistence policy
+ yarn_id: STRING,
+
+ \* the persistence policy
+ yarn_persistence: PersistPolicySet,
+
+ \*A description
+ description: STRING,
+
+ \* A set of endpoints
+ external: Endpoints,
+
+ \* Endpoints intended for use internally
+ internal: Endpoints,
+
+ \* Attributes are a function
+ attributes: Attributes
+]
+
+
+----------------------------------------------------------------------------------------
+
+(* Action Records *)
+
+putAction == [
+ type: "put",
+ record: ServiceRecord
+]
+
+deleteAction == [
+ type: "delete",
+ path: STRING,
+ recursive: BOOLEAN
+]
+
+purgeAction == [
+ type: "purge",
+ path: STRING,
+ persistence: PersistPolicySet
+]
+
+mkNodeAction == [
+ type: "mknode",
+ path: STRING,
+ parents: BOOLEAN
+]
+
+
+----------------------------------------------------------------------------------------
+
+(*
+
+ Path operations
+
+*)
+
+(*
+Parent is defined for non empty sequences
+ *)
+
+parent(path) == SubSeq(path, 1, Len(path)-1)
+
+isParent(path, c) == path = parent(c)
+
+----------------------------------------------------------------------------------------
+(*
+Registry Access Operations
+*)
+
+(*
+Lookup all entries in a registry with a matching path
+*)
+
+resolve(Registry, path) == \A entry \in Registry: entry.path = path
+
+(*
+A path exists in the registry iff there is an entry with that path
+*)
+
+exists(Registry, path) == resolve(Registry, path) /= {}
+
+(*
+A parent entry, or an empty set if there is none
+*)
+parentEntry(Registry, path) == resolve(Registry, parent(path))
+
+(*
+A root path is the empty sequence
+*)
+isRootPath(path) == path = <<>>
+
+(*
+The root entry is the entry whose path is the root path
+*)
+isRootEntry(entry) == entry.path = <<>>
+
+
+(*
+A path p is an ancestor of another path d if they are different, and the path d
+starts with path p
+*)
+
+isAncestorOf(path, d) ==
+ /\ path /= d
+ /\ \E k : SubSeq(d, 0, k) = path
+
+
+ancestorPathOf(path) ==
+ \A a \in Paths: isAncestorOf(a, path)
+
+(*
+The set of all children of a path in the registry
+*)
+
+children(R, path) == \A c \in R: isParent(path, c.path)
+
+(*
+A path has children if the children() function does not return the empty set
+*)
+hasChildren(R, path) == children(R, path) /= {}
+
+(*
+Descendant: a child of a path or a descendant of a child of a path
+*)
+
+descendants(R, path) == \A e \in R: isAncestorOf(path, e.path)
+
+(*
+Ancestors: all entries in the registry whose path is an entry of the path argument
+*)
+ancestors(R, path) == \A e \in R: isAncestorOf(e.path, path)
+
+(*
+The set of entries that are a path and its descendants
+*)
+pathAndDescendants(R, path) ==
+ \/ \A e \in R: isAncestorOf(path, e.path)
+ \/ resolve(R, path)
+
+
+(*
+For validity, all entries must match the following criteria
+ *)
+
+validRegistry(R) ==
+ \* there can be at most one entry for a path.
+ /\ \A e \in R: Cardinality(resolve(R, e.path)) = 1
+
+ \* There's at least one root entry
+ /\ \E e \in R: isRootEntry(e)
+
+ \* an entry must be the root entry or have a parent entry
+ /\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
+
+ \* If the entry has data, it must be a service record
+ /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords)
+
+
+----------------------------------------------------------------------------------------
+(*
+Registry Manipulation
+*)
+
+(*
+An entry can be put into the registry iff
+its parent is present or it is the root entry
+*)
+canBind(R, e) ==
+ isRootEntry(e) \/ exists(R, parent(e.path))
+
+(*
+'bind() adds/replaces an entry if permitted
+*)
+
+bind(R, e) ==
+ /\ canBind(R, e)
+ /\ R' = (R \ resolve(R, e.path)) \union {e}
+
+
+(*
+mknode() adds a new empty entry where there was none before, iff
+-the parent exists
+-it meets the requirement for being "bindable"
+*)
+
+mknodeSimple(R, path) ==
+ LET record == [ path |-> path, data |-> <<>> ]
+ IN \/ exists(R, path)
+ \/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} ))
+
+
+(*
+For all parents, the mknodeSimpl() criteria must apply.
+This could be defined recursively, though as TLA+ does not support recursion,
+an alternative is required
+
+
+Because this specification is declaring the final state of a operation, not
+the implemental, all that is needed is to describe those parents.
+
+It declares that the mkdirSimple state applies to the path and all its parents in the set R'
+
+*)
+mknodeWithParents(R, path) ==
+ /\ \A p2 \in ancestors(R, path) : mknodeSimple(R, p2)
+ /\ mknodeSimple(R, path)
+
+
+mknode(R, path, recursive) ==
+ IF recursive THEN mknodeWithParents(R, path) ELSE mknodeSimple(R, path)
+
+(*
+Deletion is set difference on any existing entries
+*)
+
+simpleDelete(R, path) ==
+ /\ ~isRootPath(path)
+ /\ children(R, path) = {}
+ /\ R' = R \ resolve(R, path)
+
+(*
+Recursive delete: neither the path or its descendants exists in the new registry
+*)
+
+recursiveDelete(R, path) ==
+ \* Root path: the new registry is the initial registry again
+ /\ isRootPath(path) => R' = { [ path |-> <<>>, data |-> <<>> ] }
+ \* Any other entry: the new registry is a set with any existing
+ \* entry for that path is removed, and the new entry added
+ /\ ~isRootPath(path) => R' = R \ ( resolve(R, path) \union descendants(R, path))
+
+
+(*
+Delete operation which chooses the recursiveness policy based on an argument
+*)
+
+delete(R, path, recursive) ==
+ IF recursive THEN recursiveDelete(R, path) ELSE simpleDelete(R, path)
+
+
+(*
+Purge ensures that all entries under a path with the matching ID and policy are not there
+afterwards
+*)
+
+purge(R, path, id, persistence) ==
+ /\ (persistence \in PersistPolicySet)
+ /\ \A p2 \in pathAndDescendants(R, path) :
+ (p2.attributes["yarn:id"] = id /\ p2.attributes["yarn:persistence"] = persistence)
+ => recursiveDelete(R, p2.path)
+
+(*
+resolveRecord() resolves the record at a path or fails.
+
+It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
+is guaranteed to return the single entry of that set, iff the choice predicate holds.
+
+Using a predicate of TRUE, it always succeeds, so this function selects
+the sole entry of the resolve operation.
+*)
+
+resolveRecord(R, path) ==
+ LET l == resolve(R, path) IN
+ /\ Cardinality(l) = 1
+ /\ CHOOSE e \in l : TRUE
+
+(*
+The specific action of putting an entry into a record includes validating the record
+*)
+
+validRecordToBind(path, record) ==
+ \* The root entry must have permanent persistence
+ isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent"
+ \/ record.attributes["yarn:persistence"] = "")
+
+
+(*
+Binding a service record involves validating it then putting it in the registry
+marshalled as the data in the entry
+ *)
+bindRecord(R, path, record) ==
+ /\ validRecordToBind(path, record)
+ /\ bind(R, [path |-> path, data |-> record])
+
+
+----------------------------------------------------------------------------------------
+
+
+
+(*
+The action queue can only contain one of the sets of action types, and
+by giving each a unique name, those sets are guaranteed to be disjoint
+*)
+ QueueInvariant ==
+ /\ \A a \in actions:
+ \/ (a \in BindActions /\ a.type="bind")
+ \/ (a \in DeleteActions /\ a.type="delete")
+ \/ (a \in PurgeActions /\ a.type="purge")
+ \/ (a \in MknodeActions /\ a.type="mknode")
+
+
+(*
+Applying queued actions
+*)
+
+applyAction(R, a) ==
+ \/ (a \in BindActions /\ bindRecord(R, a.path, a.record) )
+ \/ (a \in MknodeActions /\ mknode(R, a.path, a.recursive) )
+ \/ (a \in DeleteActions /\ delete(R, a.path, a.recursive) )
+ \/ (a \in PurgeActions /\ purge(R, a.path, a.id, a.persistence))
+
+
+(*
+Apply the first action in a list and then update the actions
+*)
+applyFirstAction(R, a) ==
+ /\ actions /= <<>>
+ /\ applyAction(R, Head(a))
+ /\ actions' = Tail(a)
+
+
+Next == applyFirstAction(registry, actions)
+
+(*
+All submitted actions must eventually be applied.
+*)
+
+
+Liveness == <>( actions = <<>> )
+
+
+(*
+The initial state of a registry has the root entry.
+*)
+
+InitialRegistry == registry = {
+ [ path |-> <<>>, data |-> <<>> ]
+}
+
+
+(*
+The valid state of the "registry" variable is defined as
+Via the validRegistry predicate
+*)
+
+ValidRegistryState == validRegistry(registry)
+
+
+
+(*
+The initial state of the system
+*)
+InitialState ==
+ /\ InitialRegistry
+ /\ ValidRegistryState
+ /\ actions = <<>>
+
+
+(*
+The registry has an initial state, the series of state changes driven by the actions,
+and the requirement that it does act on those actions.
+*)
+RegistrySpec ==
+ /\ InitialState
+ /\ [][Next]_vars
+ /\ Liveness
+
+
+----------------------------------------------------------------------------------------
+
+(*
+Theorem: For all operations from that initial state, the registry state is still valid
+*)
+THEOREM InitialState => [] ValidRegistryState
+
+(*
+Theorem: for all operations from that initial state, the type invariants hold
+*)
+THEOREM InitialState => [] TypeInvariant
+
+(*
+Theorem: the queue invariants hold
+*)
+THEOREM InitialState => [] QueueInvariant
+
+=============================================================================
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
new file mode 100644
index 0000000..5b34f60
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry;
+
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+/**
+ * Abstract registry tests .. inits the field {@link #registry}
+ * before the test with an instance of {@link RMRegistryOperationsService};
+ * and {@link #operations} with the same instance cast purely
+ * to the type {@link RegistryOperations}.
+ *
+ */
+public class AbstractRegistryTest extends AbstractZKRegistryTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractRegistryTest.class);
+ protected RMRegistryOperationsService registry;
+ protected RegistryOperations operations;
+
+ @Before
+ public void setupRegistry() throws IOException {
+ registry = new RMRegistryOperationsService("yarnRegistry");
+ operations = registry;
+ registry.init(createRegistryConfiguration());
+ registry.start();
+ operations.delete("/", true);
+ registry.createRootRegistryPaths();
+ addToTeardown(registry);
+ }
+
+ /**
+ * Create a service entry with the sample endpoints, and put it
+ * at the destination
+ * @param path path
+ * @param createFlags flags
+ * @return the record
+ * @throws IOException on a failure
+ */
+ protected ServiceRecord putExampleServiceEntry(String path, int createFlags) throws
+ IOException,
+ URISyntaxException {
+ return putExampleServiceEntry(path, createFlags, PersistencePolicies.PERMANENT);
+ }
+
+ /**
+ * Create a service entry with the sample endpoints, and put it
+ * at the destination
+ * @param path path
+ * @param createFlags flags
+ * @return the record
+ * @throws IOException on a failure
+ */
+ protected ServiceRecord putExampleServiceEntry(String path,
+ int createFlags,
+ String persistence)
+ throws IOException, URISyntaxException {
+ ServiceRecord record = buildExampleServiceEntry(persistence);
+
+ registry.mknode(RegistryPathUtils.parentOf(path), true);
+ operations.bind(path, record, createFlags);
+ return record;
+ }
+
+ /**
+ * Assert a path exists
+ * @param path path in the registry
+ * @throws IOException
+ */
+ public void assertPathExists(String path) throws IOException {
+ operations.stat(path);
+ }
+
+ /**
+ * assert that a path does not exist
+ * @param path path in the registry
+ * @throws IOException
+ */
+ public void assertPathNotFound(String path) throws IOException {
+ try {
+ operations.stat(path);
+ fail("Path unexpectedly found: " + path);
+ } catch (PathNotFoundException e) {
+
+ }
+ }
+
+ /**
+ * Assert that a path resolves to a service record
+ * @param path path in the registry
+ * @throws IOException
+ */
+ public void assertResolves(String path) throws IOException {
+ operations.resolve(path);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java
new file mode 100644
index 0000000..bcff622
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.server.services.AddingCompositeService;
+import org.apache.hadoop.registry.server.services.MicroZookeeperService;
+import org.apache.hadoop.registry.server.services.MicroZookeeperServiceKeys;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class AbstractZKRegistryTest extends RegistryTestHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractZKRegistryTest.class);
+
+ private static final AddingCompositeService servicesToTeardown =
+ new AddingCompositeService("teardown");
+ // static initializer guarantees it is always started
+ // ahead of any @BeforeClass methods
+ static {
+ servicesToTeardown.init(new Configuration());
+ servicesToTeardown.start();
+ }
+
+ @Rule
+ public final Timeout testTimeout = new Timeout(10000);
+
+ @Rule
+ public TestName methodName = new TestName();
+
+ protected static void addToTeardown(Service svc) {
+ servicesToTeardown.addService(svc);
+ }
+
+ @AfterClass
+ public static void teardownServices() throws IOException {
+ describe(LOG, "teardown of static services");
+ servicesToTeardown.close();
+ }
+
+ protected static MicroZookeeperService zookeeper;
+
+
+ @BeforeClass
+ public static void createZKServer() throws Exception {
+ File zkDir = new File("target/zookeeper");
+ FileUtils.deleteDirectory(zkDir);
+ assertTrue(zkDir.mkdirs());
+ zookeeper = new MicroZookeeperService("InMemoryZKService");
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MicroZookeeperServiceKeys.KEY_ZKSERVICE_DIR, zkDir.getAbsolutePath());
+ zookeeper.init(conf);
+ zookeeper.start();
+ addToTeardown(zookeeper);
+ }
+
+ /**
+ * give our thread a name
+ */
+ @Before
+ public void nameThread() {
+ Thread.currentThread().setName("JUnit");
+ }
+
+ /**
+ * Returns the connection string to use
+ *
+ * @return connection string
+ */
+ public String getConnectString() {
+ return zookeeper.getConnectionString();
+ }
+
+ public YarnConfiguration createRegistryConfiguration() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, 1000);
+ conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_INTERVAL, 500);
+ conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_TIMES, 10);
+ conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_CEILING, 10);
+ conf.set(RegistryConstants.KEY_REGISTRY_ZK_QUORUM,
+ zookeeper.getConnectionString());
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
new file mode 100644
index 0000000..38cc2cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.registry.secure.AbstractSecureRegistryTest;
+import org.apache.zookeeper.common.PathUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
+
+/**
+ * This is a set of static methods to aid testing the registry operations.
+ * The methods can be imported statically —or the class used as a base
+ * class for tests.
+ */
+public class RegistryTestHelper extends Assert {
+ public static final String SC_HADOOP = "org-apache-hadoop";
+ public static final String USER = "devteam/";
+ public static final String NAME = "hdfs";
+ public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs";
+ public static final String API_HDFS = "org_apache_hadoop_namenode_dfs";
+ public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
+ public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
+ public static final String ENTRY_PATH = PARENT_PATH + NAME;
+ public static final String NNIPC = "nnipc";
+ public static final String IPC2 = "IPC2";
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RegistryTestHelper.class);
+ public static final String KTUTIL = "ktutil";
+ private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
+ new RegistryUtils.ServiceRecordMarshal();
+
+ /**
+ * Assert the path is valid by ZK rules
+ * @param path path to check
+ */
+ public static void assertValidZKPath(String path) {
+ try {
+ PathUtils.validatePath(path);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid Path " + path + ": " + e, e);
+ }
+ }
+
+ /**
+ * Assert that a string is not empty (null or "")
+ * @param message message to raise if the string is empty
+ * @param check string to check
+ */
+ public static void assertNotEmpty(String message, String check) {
+ if (StringUtils.isEmpty(check)) {
+ fail(message);
+ }
+ }
+
+ /**
+ * Assert that a string is empty (null or "")
+ * @param check string to check
+ */
+ public static void assertNotEmpty(String check) {
+ if (StringUtils.isEmpty(check)) {
+ fail("Empty string");
+ }
+ }
+
+ /**
+ * Log the details of a login context
+ * @param name name to assert that the user is logged in as
+ * @param loginContext the login context
+ */
+ public static void logLoginDetails(String name,
+ LoginContext loginContext) {
+ assertNotNull("Null login context", loginContext);
+ Subject subject = loginContext.getSubject();
+ LOG.info("Logged in as {}:\n {}", name, subject);
+ }
+
+ /**
+ * Set the JVM property to enable Kerberos debugging
+ */
+ public static void enableKerberosDebugging() {
+ System.setProperty(AbstractSecureRegistryTest.SUN_SECURITY_KRB5_DEBUG,
+ "true");
+ }
+ /**
+ * Set the JVM property to enable Kerberos debugging
+ */
+ public static void disableKerberosDebugging() {
+ System.setProperty(AbstractSecureRegistryTest.SUN_SECURITY_KRB5_DEBUG,
+ "false");
+ }
+
+ /**
+ * General code to validate bits of a component/service entry built iwth
+ * {@link #addSampleEndpoints(ServiceRecord, String)}
+ * @param record instance to check
+ */
+ public static void validateEntry(ServiceRecord record) {
+ assertNotNull("null service record", record);
+ List<Endpoint> endpoints = record.external;
+ assertEquals(2, endpoints.size());
+
+ Endpoint webhdfs = findEndpoint(record, API_WEBHDFS, true, 1, 1);
+ assertEquals(API_WEBHDFS, webhdfs.api);
+ assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
+ assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
+ List<List<String>> addressList = webhdfs.addresses;
+ List<String> url = addressList.get(0);
+ String addr = url.get(0);
+ assertTrue(addr.contains("http"));
+ assertTrue(addr.contains(":8020"));
+
+ Endpoint nnipc = findEndpoint(record, NNIPC, false, 1,2);
+ assertEquals("wrong protocol in " + nnipc, ProtocolTypes.PROTOCOL_THRIFT,
+ nnipc.protocolType);
+
+ Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
+
+ Endpoint web = findEndpoint(record, "web", true, 1, 1);
+ assertEquals(1, web.addresses.size());
+ assertEquals(1, web.addresses.get(0).size());
+ }
+
+ /**
+ * Assert that an endpoint matches the criteria
+ * @param endpoint endpoint to examine
+ * @param addressType expected address type
+ * @param protocolType expected protocol type
+ * @param api API
+ */
+ public static void assertMatches(Endpoint endpoint,
+ String addressType,
+ String protocolType,
+ String api) {
+ assertNotNull(endpoint);
+ assertEquals(addressType, endpoint.addressType);
+ assertEquals(protocolType, endpoint.protocolType);
+ assertEquals(api, endpoint.api);
+ }
+
+ /**
+ * Assert the records match.
+ * @param source record that was written
+ * @param resolved the one that resolved.
+ */
+ public static void assertMatches(ServiceRecord source, ServiceRecord resolved) {
+ assertNotNull("Null source record ", source);
+ assertNotNull("Null resolved record ", resolved);
+ assertEquals(source.description, resolved.description);
+
+ Map<String, String> srcAttrs = source.attributes();
+ Map<String, String> resolvedAttrs = resolved.attributes();
+ String sourceAsString = source.toString();
+ String resolvedAsString = resolved.toString();
+ assertEquals("Wrong count of attrs in \n" + sourceAsString
+ + "\nfrom\n" + resolvedAsString,
+ srcAttrs.size(),
+ resolvedAttrs.size());
+ for (Map.Entry<String, String> entry : srcAttrs.entrySet()) {
+ String attr = entry.getKey();
+ assertEquals("attribute "+ attr, entry.getValue(), resolved.get(attr));
+ }
+ assertEquals("wrong external endpoint count",
+ source.external.size(), resolved.external.size());
+ assertEquals("wrong external endpoint count",
+ source.internal.size(), resolved.internal.size());
+ }
+
+ /**
+ * Find an endpoint in a record or fail,
+ * @param record record
+ * @param api API
+ * @param external external?
+ * @param addressElements expected # of address elements?
+ * @param addressTupleSize expected size of a type
+ * @return the endpoint.
+ */
+ public static Endpoint findEndpoint(ServiceRecord record,
+ String api, boolean external, int addressElements, int addressTupleSize) {
+ Endpoint epr = external ? record.getExternalEndpoint(api)
+ : record.getInternalEndpoint(api);
+ if (epr != null) {
+ assertEquals("wrong # of addresses",
+ addressElements, epr.addresses.size());
+ assertEquals("wrong # of elements in an address tuple",
+ addressTupleSize, epr.addresses.get(0).size());
+ return epr;
+ }
+ List<Endpoint> endpoints = external ? record.external : record.internal;
+ StringBuilder builder = new StringBuilder();
+ for (Endpoint endpoint : endpoints) {
+ builder.append("\"").append(endpoint).append("\" ");
+ }
+ fail("Did not find " + api + " in endpoints " + builder);
+ // never reached; here to keep the compiler happy
+ return null;
+ }
+
+ /**
+ * Log a record
+ * @param name record name
+ * @param record details
+ * @throws IOException only if something bizarre goes wrong marshalling
+ * a record.
+ */
+ public static void logRecord(String name, ServiceRecord record) throws
+ IOException {
+ LOG.info(" {} = \n{}\n", name, recordMarshal.toJson(record));
+ }
+
+ /**
+ * Create a service entry with the sample endpoints
+ * @param persistence persistence policy
+ * @return the record
+ * @throws IOException on a failure
+ */
+ public static ServiceRecord buildExampleServiceEntry(String persistence) throws
+ IOException,
+ URISyntaxException {
+ ServiceRecord record = new ServiceRecord();
+ record.set(YarnRegistryAttributes.YARN_ID, "example-0001");
+ record.set(YarnRegistryAttributes.YARN_PERSISTENCE, persistence);
+ addSampleEndpoints(record, "namenode");
+ return record;
+ }
+
+ /**
+ * Add some endpoints
+ * @param entry entry
+ */
+ public static void addSampleEndpoints(ServiceRecord entry, String hostname)
+ throws URISyntaxException {
+ assertNotNull(hostname);
+ entry.addExternalEndpoint(webEndpoint("web",
+ new URI("http", hostname + ":80", "/")));
+ entry.addExternalEndpoint(
+ restEndpoint(API_WEBHDFS,
+ new URI("http", hostname + ":8020", "/")));
+
+ Endpoint endpoint = ipcEndpoint(API_HDFS, true, null);
+ endpoint.addresses.add(tuple(hostname, "8030"));
+ entry.addInternalEndpoint(endpoint);
+ InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
+ entry.addInternalEndpoint(
+ inetAddrEndpoint(NNIPC, ProtocolTypes.PROTOCOL_THRIFT, "localhost",
+ 8050));
+ entry.addInternalEndpoint(
+ RegistryTypeUtils.ipcEndpoint(
+ IPC2,
+ true,
+ RegistryTypeUtils.marshall(localhost)));
+ }
+
+ /**
+ * Describe the stage in the process with a box around it -so as
+ * to highlight it in test logs
+ * @param log log to use
+ * @param text text
+ * @param args logger args
+ */
+ public static void describe(Logger log, String text, Object...args) {
+ log.info("\n=======================================");
+ log.info(text, args);
+ log.info("=======================================\n");
+ }
+
+ /**
+ * log out from a context if non-null ... exceptions are caught and logged
+ * @param login login context
+ * @return null, always
+ */
+ public static LoginContext logout(LoginContext login) {
+ try {
+ if (login != null) {
+ LOG.debug("Logging out login context {}", login.toString());
+ login.logout();
+ }
+ } catch (LoginException e) {
+ LOG.warn("Exception logging out: {}", e, e);
+ }
+ return null;
+ }
+
+ /**
+ * Exec the native <code>ktutil</code> to list the keys
+ * (primarily to verify that the generated keytabs are compatible).
+ * This operation is not executed on windows. On other platforms
+ * it requires <code>ktutil</code> to be installed and on the path
+ * <pre>
+ * ktutil --keytab=target/kdc/zookeeper.keytab list --keys
+ * </pre>
+ * @param keytab keytab to list
+ * @throws IOException on any execution problem, including the executable
+ * being missing
+ */
+ public static String ktList(File keytab) throws IOException {
+ if (!Shell.WINDOWS) {
+ String path = keytab.getAbsolutePath();
+ String out = Shell.execCommand(
+ KTUTIL,
+ "--keytab=" + path,
+ "list",
+ "--keys"
+ );
+ LOG.info("Listing of keytab {}:\n{}\n", path, out);
+ return out;
+ }
+ return "";
+ }
+
+ /**
+ * Perform a robust <code>ktutils -l</code> ... catches and ignores
+ * exceptions, otherwise the output is logged.
+ * @param keytab keytab to list
+ * @return the result of the operation, or "" on any problem
+ */
+ public static String ktListRobust(File keytab) {
+ try {
+ return ktList(keytab);
+ } catch (IOException e) {
+ // probably not on the path
+ return "";
+ }
+ }
+
+ /**
+ * Login via a UGI. Requres UGI to have been set up
+ * @param user username
+ * @param keytab keytab to list
+ * @return the UGI
+ * @throws IOException
+ */
+ public static UserGroupInformation loginUGI(String user, File keytab) throws
+ IOException {
+ LOG.info("Logging in as {} from {}", user, keytab);
+ return UserGroupInformation.loginUserFromKeytabAndReturnUGI(user,
+ keytab.getAbsolutePath());
+ }
+
+ public static ServiceRecord createRecord(String persistence) {
+ return createRecord("01", persistence, "description");
+ }
+
+ public static ServiceRecord createRecord(String id, String persistence,
+ String description) {
+ ServiceRecord serviceRecord = new ServiceRecord();
+ serviceRecord.set(YarnRegistryAttributes.YARN_ID, id);
+ serviceRecord.description = description;
+ serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE, persistence);
+ return serviceRecord;
+ }
+
+ public static ServiceRecord createRecord(String id, String persistence,
+ String description, String data) {
+ return createRecord(id, persistence, description);
+ }
+}