You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/22 19:32:04 UTC
[38/51] [abbrv] hadoop git commit: YARN-5218. Initial core change for
DNS for YARN. Contributed by Jonathan Maron
YARN-5218. Initial core change for DNS for YARN. Contributed by Jonathan Maron
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3031e921
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3031e921
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3031e921
Branch: refs/heads/yarn-native-services
Commit: 3031e9213dea9bd6c1fa42e9ea18538a3bfe4b1b
Parents: 47a16db
Author: Jian He <ji...@apache.org>
Authored: Sun Jun 12 11:32:03 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Thu Dec 22 11:09:38 2016 -0800
----------------------------------------------------------------------
hadoop-project/pom.xml | 8 +
.../dev-support/findbugs-exclude.xml | 15 +
.../hadoop-yarn/hadoop-yarn-registry/pom.xml | 5 +
.../registry/client/api/DNSOperations.java | 60 +
.../client/api/DNSOperationsFactory.java | 78 +
.../registry/client/api/RegistryConstants.java | 111 +-
.../registry/client/impl/zk/CuratorService.java | 266 ++-
.../registry/client/impl/zk/ListenerHandle.java | 25 +
.../registry/client/impl/zk/PathListener.java | 30 +
.../types/yarn/YarnRegistryAttributes.java | 16 +-
.../dns/ApplicationServiceRecordProcessor.java | 353 ++++
.../server/dns/BaseServiceRecordProcessor.java | 469 ++++++
.../dns/ContainerServiceRecordProcessor.java | 278 ++++
.../server/dns/RecordCreatorFactory.java | 275 ++++
.../hadoop/registry/server/dns/RegistryDNS.java | 1534 ++++++++++++++++++
.../registry/server/dns/RegistryDNSServer.java | 290 ++++
.../registry/server/dns/SecureableZone.java | 151 ++
.../server/dns/ServiceRecordProcessor.java | 53 +
.../registry/server/dns/ZoneSelector.java | 33 +
.../registry/server/dns/package-info.java | 26 +
.../registry/server/dns/TestRegistryDNS.java | 561 +++++++
.../server/dns/TestSecureRegistryDNS.java | 44 +
.../test/resources/0.17.172.in-addr.arpa.zone | 36 +
.../src/test/resources/test.private | 32 +
24 files changed, 4661 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index d46bde0..0f07172 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -84,6 +84,7 @@
<zookeeper.version>3.4.6</zookeeper.version>
<curator.version>2.7.1</curator.version>
<findbugs.version>3.0.0</findbugs.version>
+ <dnsjava.version>2.1.7</dnsjava.version>
<tomcat.version>6.0.48</tomcat.version>
<guice.version>4.0</guice.version>
@@ -1174,6 +1175,13 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ <version>${dnsjava.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index ab36a4e..1651a2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -583,4 +583,19 @@
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
+ <Method name="addNIOTCP" />
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
+ <Method name="addNIOUDP" />
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
+ <Method name="serveNIOTCP" />
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
index 811964a..69313c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
@@ -80,6 +80,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java
new file mode 100644
index 0000000..3abfb6c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.service.Service;
+
+import java.io.IOException;
+
+/**
+ * DNS Operations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface DNSOperations extends Service {
+
+ /**
+ * Register a service based on a service record.
+ *
+ * @param path the ZK path.
+ * @param record record providing DNS registration info.
+ * @throws IOException Any other IO Exception.
+ */
+ void register(String path, ServiceRecord record)
+ throws IOException;
+
+
+ /**
+ * Delete a service's registered endpoints.
+ *
+ * If the operation returns without an error then the entry has been
+ * deleted.
+ *
+ * @param path the ZK path.
+ * @param record service record
+ * @throws IOException Any other IO Exception
+ *
+ */
+ void delete(String path, ServiceRecord record)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java
new file mode 100644
index 0000000..1a8bb3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.api;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.server.dns.RegistryDNS;
+
+/**
+ * A factory for DNS operation service instances.
+ */
+public final class DNSOperationsFactory implements RegistryConstants {
+
+ /**
+ * DNS Implementation type.
+ */
+ public enum DNSImplementation {
+ DNSJAVA
+ }
+
+ private DNSOperationsFactory() {
+ }
+
+ /**
+ * Create and initialize a DNS operations instance.
+ *
+ * @param conf configuration
+ * @return a DNS operations instance
+ */
+ public static DNSOperations createInstance(Configuration conf) {
+ return createInstance("DNSOperations", DNSImplementation.DNSJAVA, conf);
+ }
+
+ /**
+ * Create and initialize a registry operations instance.
+ * Access rights will be determined from the configuration.
+ *
+ * @param name name of the instance
+ * @param impl the DNS implementation.
+ * @param conf configuration
+ * @return a registry operations instance
+ */
+ public static DNSOperations createInstance(String name,
+ DNSImplementation impl,
+ Configuration conf) {
+ Preconditions.checkArgument(conf != null, "Null configuration");
+ DNSOperations operations = null;
+ switch (impl) {
+ case DNSJAVA:
+ operations = new RegistryDNS(name);
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ String.format("%s is not available", impl.toString()));
+ }
+
+ //operations.init(conf);
+ return operations;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
index a6fe216..7115a4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
@@ -44,17 +44,106 @@ public interface RegistryConstants {
String ZK_PREFIX = REGISTRY_PREFIX + "zk.";
/**
+ * Prefix for dns-specific options: {@value}
+ * <p>
+ * For clients using other protocols, these options are not supported.
+ */
+ String DNS_PREFIX = REGISTRY_PREFIX + "dns.";
+
+ /**
* flag to indicate whether or not the registry should
- * be enabled in the RM: {@value}
+ * be enabled in the RM: {@value}.
*/
String KEY_REGISTRY_ENABLED = REGISTRY_PREFIX + "rm.enabled";
/**
- * Defaut value for enabling the registry in the RM: {@value}
+ * Defaut value for enabling the registry in the RM: {@value}.
*/
boolean DEFAULT_REGISTRY_ENABLED = false;
/**
+ * flag to indicate whether or not the registry should
+ * be enabled in the RM: {@value}.
+ */
+ String KEY_DNS_ENABLED = DNS_PREFIX + "enabled";
+
+ /**
+ * Defaut value for enabling the DNS in the Registry: {@value}.
+ */
+ boolean DEFAULT_DNS_ENABLED = false;
+
+ /**
+ * DNS domain name key.
+ */
+ String KEY_DNS_DOMAIN = DNS_PREFIX + "domain-name";
+
+ /**
+ * DNS bind address.
+ */
+ String KEY_DNS_BIND_ADDRESS = DNS_PREFIX + "bind-address";
+
+ /**
+ * DNS port number key.
+ */
+ String KEY_DNS_PORT = DNS_PREFIX + "bind-port";
+
+ /**
+ * Default DNS port number.
+ */
+ int DEFAULT_DNS_PORT = 53;
+
+ /**
+ * DNSSEC Enabled?
+ */
+ String KEY_DNSSEC_ENABLED = DNS_PREFIX + "dnssec.enabled";
+
+ /**
+ * DNSSEC Enabled?
+ */
+ String KEY_DNSSEC_PUBLIC_KEY = DNS_PREFIX + "public-key";
+
+ /**
+ * DNSSEC private key file.
+ */
+ String KEY_DNSSEC_PRIVATE_KEY_FILE = DNS_PREFIX + "private-key-file";
+
+ /**
+ * Default DNSSEC private key file path.
+ */
+ String DEFAULT_DNSSEC_PRIVATE_KEY_FILE =
+ "/etc/hadoop/conf/registryDNS.private";
+
+ /**
+ * Zone subnet.
+ */
+ String KEY_DNS_ZONE_SUBNET = DNS_PREFIX + "zone-subnet";
+
+ /**
+ * Zone subnet mask.
+ */
+ String KEY_DNS_ZONE_MASK = DNS_PREFIX + "zone-mask";
+
+ /**
+ * Zone subnet IP min.
+ */
+ String KEY_DNS_ZONE_IP_MIN = DNS_PREFIX + "zone-ip-min";
+
+ /**
+ * Zone subnet IP max.
+ */
+ String KEY_DNS_ZONE_IP_MAX = DNS_PREFIX + "zone-ip-max";
+
+ /**
+ * DNS Record TTL.
+ */
+ String KEY_DNS_TTL = DNS_PREFIX + "dns-ttl";
+
+ /**
+ * DNS Record TTL.
+ */
+ String KEY_DNS_ZONES_DIR = DNS_PREFIX + "zones-dir";
+
+ /**
* Key to set if the registry is secure: {@value}.
* Turning it on changes the permissions policy from "open access"
* to restrictions on kerberos with the option of
@@ -69,12 +158,12 @@ public interface RegistryConstants {
boolean DEFAULT_REGISTRY_SECURE = false;
/**
- * Root path in the ZK tree for the registry: {@value}
+ * Root path in the ZK tree for the registry: {@value}.
*/
String KEY_REGISTRY_ZK_ROOT = ZK_PREFIX + "root";
/**
- * Default root of the yarn registry: {@value}
+ * Default root of the yarn registry: {@value}.
*/
String DEFAULT_ZK_REGISTRY_ROOT = "/registry";
@@ -92,7 +181,7 @@ public interface RegistryConstants {
/**
* Registry client uses Kerberos: authentication is automatic from
- * logged in user
+ * logged in user.
*/
String REGISTRY_CLIENT_AUTH_KERBEROS = "kerberos";
@@ -104,12 +193,12 @@ public interface RegistryConstants {
String REGISTRY_CLIENT_AUTH_DIGEST = "digest";
/**
- * No authentication; client is anonymous
+ * No authentication; client is anonymous.
*/
String REGISTRY_CLIENT_AUTH_ANONYMOUS = "";
/**
- * Registry client authentication ID
+ * Registry client authentication ID.
* <p>
* This is only used in secure clusters with
* {@link #KEY_REGISTRY_CLIENT_AUTH} set to
@@ -134,17 +223,17 @@ public interface RegistryConstants {
/**
* List of hostname:port pairs defining the
- * zookeeper quorum binding for the registry {@value}
+ * zookeeper quorum binding for the registry {@value}.
*/
String KEY_REGISTRY_ZK_QUORUM = ZK_PREFIX + "quorum";
/**
- * The default zookeeper quorum binding for the registry: {@value}
+ * The default zookeeper quorum binding for the registry: {@value}.
*/
String DEFAULT_REGISTRY_ZK_QUORUM = "localhost:2181";
/**
- * Zookeeper session timeout in milliseconds: {@value}
+ * Zookeeper session timeout in milliseconds: {@value}.
*/
String KEY_REGISTRY_ZK_SESSION_TIMEOUT =
ZK_PREFIX + "session.timeout.ms";
@@ -259,7 +348,7 @@ public interface RegistryConstants {
String KEY_REGISTRY_CLIENT_JAAS_CONTEXT = REGISTRY_PREFIX + "jaas.context";
/**
- * default client-side registry JAAS context: {@value}
+ * default client-side registry JAAS context: {@value}.
*/
String DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT = "Client";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
index 7f35c3f..ad008c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
@@ -28,6 +28,9 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -36,14 +39,14 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException;
import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException;
import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
import org.apache.hadoop.registry.client.exceptions.RegistryIOException;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.ServiceStateException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
@@ -69,12 +72,12 @@ public class CuratorService extends CompositeService
LoggerFactory.getLogger(CuratorService.class);
/**
- * the Curator binding
+ * the Curator binding.
*/
private CuratorFramework curator;
/**
- * Path to the registry root
+ * Path to the registry root.
*/
private String registryRoot;
@@ -85,17 +88,17 @@ public class CuratorService extends CompositeService
private final RegistryBindingSource bindingSource;
/**
- * Security service
+ * Security service.
*/
private RegistrySecurity registrySecurity;
/**
- * the connection binding text for messages
+ * the connection binding text for messages.
*/
private String connectionDescription;
/**
- * Security connection diagnostics
+ * Security connection diagnostics.
*/
private String securityConnectionDiagnostics = "";
@@ -106,10 +109,16 @@ public class CuratorService extends CompositeService
private EnsembleProvider ensembleProvider;
/**
+ * Registry tree cache.
+ */
+ private TreeCache treeCache;
+
+ /**
* Construct the service.
- * @param name service name
+ *
+ * @param name service name
* @param bindingSource source of binding information.
- * If null: use this instance
+ * If null: use this instance
*/
public CuratorService(String name, RegistryBindingSource bindingSource) {
super(name);
@@ -122,7 +131,8 @@ public class CuratorService extends CompositeService
/**
* Create an instance using this service as the binding source (i.e. read
- * configuration options from the registry)
+ * configuration options from the registry).
+ *
* @param name service name
*/
public CuratorService(String name) {
@@ -131,7 +141,8 @@ public class CuratorService extends CompositeService
/**
* Init the service.
- * This is where the security bindings are set up
+ * This is where the security bindings are set up.
+ *
* @param conf configuration of the service
* @throws Exception
*/
@@ -155,6 +166,7 @@ public class CuratorService extends CompositeService
/**
* Start the service.
* This is where the curator instance is started.
+ *
* @throws Exception
*/
@Override
@@ -167,29 +179,35 @@ public class CuratorService extends CompositeService
}
/**
- * Close the ZK connection if it is open
+ * Close the ZK connection if it is open.
*/
@Override
protected void serviceStop() throws Exception {
IOUtils.closeStream(curator);
+
+ if (treeCache != null) {
+ treeCache.close();
+ }
super.serviceStop();
}
/**
- * Internal check that a service is in the live state
+ * Internal check that a service is in the live state.
+ *
* @throws ServiceStateException if not
*/
private void checkServiceLive() throws ServiceStateException {
if (!isInState(STATE.STARTED)) {
throw new ServiceStateException(
"Service " + getName() + " is in wrong state: "
- + getServiceState());
+ + getServiceState());
}
}
/**
* Flag to indicate whether or not the registry is secure.
* Valid once the service is inited.
+ *
* @return service security policy
*/
public boolean isSecure() {
@@ -197,7 +215,8 @@ public class CuratorService extends CompositeService
}
/**
- * Get the registry security helper
+ * Get the registry security helper.
+ *
* @return the registry security helper
*/
protected RegistrySecurity getRegistrySecurity() {
@@ -205,7 +224,8 @@ public class CuratorService extends CompositeService
}
/**
- * Build the security diagnostics string
+ * Build the security diagnostics string.
+ *
* @return a string for diagnostics
*/
protected String buildSecurityDiagnostics() {
@@ -224,6 +244,7 @@ public class CuratorService extends CompositeService
* Create a new curator instance off the root path; using configuration
* options provided in the service configuration to set timeouts and
* retry policy.
+ *
* @return the newly created creator
*/
private CuratorFramework createCurator() throws IOException {
@@ -250,14 +271,15 @@ public class CuratorService extends CompositeService
// set the security options
// build up the curator itself
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ CuratorFrameworkFactory.Builder builder =
+ CuratorFrameworkFactory.builder();
builder.ensembleProvider(ensembleProvider)
- .connectionTimeoutMs(connectionTimeout)
- .sessionTimeoutMs(sessionTimeout)
+ .connectionTimeoutMs(connectionTimeout)
+ .sessionTimeoutMs(sessionTimeout)
- .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
- retryCeiling,
- retryTimes));
+ .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
+ retryCeiling,
+ retryTimes));
// set up the builder AND any JVM context
registrySecurity.applySecurityEnvironment(builder);
@@ -273,21 +295,23 @@ public class CuratorService extends CompositeService
@Override
public String toString() {
return super.toString()
- + " " + bindingDiagnosticDetails();
+ + " " + bindingDiagnosticDetails();
}
/**
- * Get the binding diagnostics
+ * Get the binding diagnostics.
+ *
* @return a diagnostics string valid after the service is started.
*/
public String bindingDiagnosticDetails() {
return " Connection=\"" + connectionDescription + "\""
- + " root=\"" + registryRoot + "\""
- + " " + securityConnectionDiagnostics;
+ + " root=\"" + registryRoot + "\""
+ + " " + securityConnectionDiagnostics;
}
/**
- * Create a full path from the registry root and the supplied subdir
+ * Create a full path from the registry root and the supplied subdir.
+ *
* @param path path of operation
* @return an absolute path
* @throws IllegalArgumentException if the path is invalide
@@ -299,6 +323,7 @@ public class CuratorService extends CompositeService
/**
* Get the registry binding source ... this can be used to
* create new ensemble providers
+ *
* @return the registry binding source in use
*/
public RegistryBindingSource getBindingSource() {
@@ -308,23 +333,23 @@ public class CuratorService extends CompositeService
/**
* Create the ensemble provider for this registry, by invoking
* {@link RegistryBindingSource#supplyBindingInformation()} on
- * the provider stored in {@link #bindingSource}
+ * the provider stored in {@link #bindingSource}.
* Sets {@link #ensembleProvider} to that value;
* sets {@link #connectionDescription} to the binding info
* for use in toString and logging;
- *
*/
protected void createEnsembleProvider() {
BindingInformation binding = bindingSource.supplyBindingInformation();
connectionDescription = binding.description
- + " " + securityConnectionDiagnostics;
+ + " " + securityConnectionDiagnostics;
ensembleProvider = binding.ensembleProvider;
}
/**
* Supply the binding information.
* This implementation returns a fixed ensemble bonded to
- * the quorum supplied by {@link #buildConnectionString()}
+ * the quorum supplied by {@link #buildConnectionString()}.
+ *
* @return the binding information
*/
@Override
@@ -339,17 +364,19 @@ public class CuratorService extends CompositeService
/**
* Override point: get the connection string used to connect to
- * the ZK service
+ * the ZK service.
+ *
* @return a registry quorum
*/
protected String buildConnectionString() {
return getConfig().getTrimmed(KEY_REGISTRY_ZK_QUORUM,
- DEFAULT_REGISTRY_ZK_QUORUM);
+ DEFAULT_REGISTRY_ZK_QUORUM);
}
/**
- * Create an IOE when an operation fails
- * @param path path of operation
+ * Create an IOE when an operation fails.
+ *
+ * @param path path of operation
* @param operation operation attempted
* @param exception caught the exception caught
* @return an IOE to throw that contains the path and operation details.
@@ -361,8 +388,9 @@ public class CuratorService extends CompositeService
}
/**
- * Create an IOE when an operation fails
- * @param path path of operation
+ * Create an IOE when an operation fails.
+ *
+ * @param path path of operation
* @param operation operation attempted
* @param exception caught the exception caught
* @return an IOE to throw that contains the path and operation details.
@@ -385,9 +413,10 @@ public class CuratorService extends CompositeService
} else if (exception instanceof KeeperException.AuthFailedException) {
ioe = new AuthenticationFailedException(path,
"Authentication Failed: " + exception
- + "; " + securityConnectionDiagnostics,
+ + "; " + securityConnectionDiagnostics,
exception);
- } else if (exception instanceof KeeperException.NoChildrenForEphemeralsException) {
+ } else if (exception instanceof
+ KeeperException.NoChildrenForEphemeralsException) {
ioe = new NoChildrenForEphemeralsException(path,
"Cannot create a path under an ephemeral node: " + exception,
exception);
@@ -402,7 +431,7 @@ public class CuratorService extends CompositeService
} else {
ioe = new RegistryIOException(path,
"Failure of " + operation + " on " + path + ": " +
- exception.toString(),
+ exception.toString(),
exception);
}
if (ioe.getCause() == null) {
@@ -417,8 +446,8 @@ public class CuratorService extends CompositeService
* may create the same path before the create() operation is executed/
* propagated to the ZK node polled.
*
- * @param path path to create
- * @param acl ACL for path -used when creating a new entry
+ * @param path path to create
+ * @param acl ACL for path -used when creating a new entry
* @param createParents flag to trigger parent creation
* @return true iff the path was created
* @throws IOException
@@ -432,10 +461,11 @@ public class CuratorService extends CompositeService
}
/**
- * Stat the file
+ * Stat the file.
+ *
* @param path path of operation
* @return a curator stat entry
- * @throws IOException on a failure
+ * @throws IOException on a failure
* @throws PathNotFoundException if the path was not found
*/
public Stat zkStat(String path) throws IOException {
@@ -457,7 +487,8 @@ public class CuratorService extends CompositeService
}
/**
- * Get the ACLs of a path
+ * Get the ACLs of a path.
+ *
* @param path path of operation
* @return a possibly empty list of ACLs
* @throws IOException
@@ -481,12 +512,13 @@ public class CuratorService extends CompositeService
}
/**
- * Probe for a path existing
+ * Probe for a path existing.
+ *
* @param path path of operation
* @return true if the path was visible from the ZK server
* queried.
* @throws IOException on any exception other than
- * {@link PathNotFoundException}
+ * {@link PathNotFoundException}
*/
public boolean zkPathExists(String path) throws IOException {
checkServiceLive();
@@ -503,7 +535,8 @@ public class CuratorService extends CompositeService
}
/**
- * Verify a path exists
+ * Verify a path exists.
+ *
* @param path path of operation
* @throws PathNotFoundException if the path is absent
* @throws IOException
@@ -514,11 +547,12 @@ public class CuratorService extends CompositeService
}
/**
- * Create a directory. It is not an error if it already exists
- * @param path path to create
- * @param mode mode for path
+ * Create a directory. It is not an error if it already exists.
+ *
+ * @param path path to create
+ * @param mode mode for path
* @param createParents flag to trigger parent creation
- * @param acls ACL for path
+ * @param acls ACL for path
* @throws IOException any problem
*/
public boolean zkMkPath(String path,
@@ -558,9 +592,10 @@ public class CuratorService extends CompositeService
}
/**
- * Recursively make a path
+ * Recursively make a path.
+ *
* @param path path to create
- * @param acl ACL for path
+ * @param acl ACL for path
* @throws IOException any problem
*/
public void zkMkParentPath(String path,
@@ -574,7 +609,8 @@ public class CuratorService extends CompositeService
/**
* Create a path with given data. byte[0] is used for a path
- * without data
+ * without data.
+ *
* @param path path of operation
* @param data initial data
* @param acls
@@ -600,7 +636,8 @@ public class CuratorService extends CompositeService
}
/**
- * Update the data for a path
+ * Update the data for a path.
+ *
* @param path path of operation
* @param data new data
* @throws IOException
@@ -620,13 +657,14 @@ public class CuratorService extends CompositeService
}
/**
- * Create or update an entry
- * @param path path
- * @param data data
- * @param acl ACL for path -used when creating a new entry
+ * Create or update an entry.
+ *
+ * @param path path
+ * @param data data
+ * @param acl ACL for path -used when creating a new entry
* @param overwrite enable overwrite
- * @throws IOException
* @return true if the entry was created, false if it was simply updated.
+ * @throws IOException
*/
public boolean zkSet(String path,
CreateMode mode,
@@ -649,12 +687,13 @@ public class CuratorService extends CompositeService
/**
* Delete a directory/directory tree.
- * It is not an error to delete a path that does not exist
- * @param path path of operation
- * @param recursive flag to trigger recursive deletion
+ * It is not an error to delete a path that does not exist.
+ *
+ * @param path path of operation
+ * @param recursive flag to trigger recursive deletion
* @param backgroundCallback callback; this being set converts the operation
- * into an async/background operation.
- * task
+ * into an async/background operation.
+ * task
* @throws IOException on problems other than no-such-path
*/
public void zkDelete(String path,
@@ -682,7 +721,8 @@ public class CuratorService extends CompositeService
}
/**
- * List all children of a path
+ * List all children of a path.
+ *
* @param path path of operation
* @return a possibly empty list of children
* @throws IOException
@@ -703,7 +743,8 @@ public class CuratorService extends CompositeService
}
/**
- * Read data on a path
+ * Read data on a path.
+ *
* @param path path of operation
* @return the data
* @throws IOException read failure
@@ -724,9 +765,10 @@ public class CuratorService extends CompositeService
/**
* Return a path dumper instance which can do a full dump
* of the registry tree in its <code>toString()</code>
- * operation
- * @return a class to dump the registry
+ * operation.
+ *
* @param verbose verbose flag - includes more details (such as ACLs)
+ * @return a class to dump the registry
*/
public ZKPathDumper dumpPath(boolean verbose) {
return new ZKPathDumper(curator, registryRoot, verbose);
@@ -734,7 +776,8 @@ public class CuratorService extends CompositeService
/**
* Add a new write access entry for all future write operations.
- * @param id ID to use
+ *
+ * @param id ID to use
* @param pass password
* @throws IOException on any failure to build the digest
*/
@@ -746,16 +789,16 @@ public class CuratorService extends CompositeService
}
/**
- * Clear all write accessors
+ * Clear all write accessors.
*/
public void clearWriteAccessors() {
getRegistrySecurity().resetDigestACLs();
}
-
/**
* Diagnostics method to dump a registry robustly.
- * Any exception raised is swallowed
+ * Any exception raised is swallowed.
+ *
* @param verbose verbose path dump
* @return the registry tree
*/
@@ -769,4 +812,79 @@ public class CuratorService extends CompositeService
}
return "";
}
+
+ /**
+ * Registers a listener to path related events.
+ *
+ * @param listener the listener.
+ * @return a handle allowing for the management of the listener.
+ * @throws Exception if registration fails due to error.
+ */
+ public ListenerHandle registerPathListener(final PathListener listener)
+ throws Exception {
+
+ final TreeCacheListener pathChildrenCacheListener =
+ new TreeCacheListener() {
+
+ public void childEvent(CuratorFramework curatorFramework,
+ TreeCacheEvent event)
+ throws Exception {
+ String path = null;
+ if (event != null && event.getData() != null) {
+ path = event.getData().getPath();
+ }
+ assert event != null;
+ switch (event.getType()) {
+ case NODE_ADDED:
+ LOG.info("Informing listener of added node {}", path);
+ listener.nodeAdded(path);
+
+ break;
+
+ case NODE_REMOVED:
+ LOG.info("Informing listener of removed node {}", path);
+ listener.nodeRemoved(path);
+
+ break;
+
+ case NODE_UPDATED:
+ LOG.info("Informing listener of updated node {}", path);
+ listener.nodeAdded(path);
+
+ break;
+
+ default:
+ // do nothing
+ break;
+
+ }
+ }
+ };
+ treeCache.getListenable().addListener(pathChildrenCacheListener);
+
+ return new ListenerHandle() {
+ @Override
+ public void remove() {
+ treeCache.getListenable().removeListener(pathChildrenCacheListener);
+ }
+ };
+
+ }
+
+ // TODO: should caches be stopped and then restarted if need be?
+
+ /**
+ * Create the tree cache that monitors the registry for node addition, update,
+ * and deletion.
+ *
+ * @throws Exception if any issue arises during monitoring.
+ */
+ public void monitorRegistryEntries()
+ throws Exception {
+ String registryPath =
+ getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+ RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+ treeCache = new TreeCache(curator, registryPath);
+ treeCache.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java
new file mode 100644
index 0000000..e43dbbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.client.impl.zk;
+
+/**
+ *
+ */
+public interface ListenerHandle {
+ void remove();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java
new file mode 100644
index 0000000..db1e509
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.client.impl.zk;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface PathListener {
+
+ void nodeAdded(String path) throws IOException;
+
+ void nodeRemoved(String path) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
index 7b78932..5eaa9c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
@@ -19,13 +19,23 @@
package org.apache.hadoop.registry.client.types.yarn;
/**
- * YARN specific attributes in the registry
+ * YARN specific attributes in the registry.
*/
-public class YarnRegistryAttributes {
+public final class YarnRegistryAttributes {
/**
- * ID. For containers: container ID. For application instances, application ID.
+ * Hidden constructor.
+ */
+ private YarnRegistryAttributes() {
+ }
+
+ /**
+ * ID. For containers: container ID. For application instances,
+ * application ID.
*/
public static final String YARN_ID = "yarn:id";
public static final String YARN_PERSISTENCE = "yarn:persistence";
+ public static final String YARN_PATH = "yarn:path";
+ public static final String YARN_HOSTNAME = "yarn:hostname";
+ public static final String YARN_IP = "yarn:ip";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java
new file mode 100644
index 0000000..e6a1b5b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.Type;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A processor for generating application DNS records from registry service
+ * records.
+ */
+public class ApplicationServiceRecordProcessor extends
+ BaseServiceRecordProcessor {
+
+ /**
+ * Create an application service record processor.
+ *
+ * @param record the service record
+ * @param path the service record registry node path
+ * @param domain the DNS zone/domain name
+ * @param zoneSelector returns the zone associated with the provided name.
+ * @throws Exception if an issue is generated during instantiation.
+ */
+ public ApplicationServiceRecordProcessor(
+ ServiceRecord record, String path, String domain,
+ ZoneSelector zoneSelector) throws Exception {
+ super(record, path, domain, zoneSelector);
+ }
+
+ /**
+ * Initializes the DNS record type to descriptor mapping based on the
+ * provided service record.
+ *
+ * @param serviceRecord the registry service record.
+ * @throws Exception if an issue is encountered.
+ */
+ @Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
+ throws Exception {
+ for (int type : getRecordTypes()) {
+ switch (type) {
+ case Type.A:
+ createAInfo(serviceRecord);
+ break;
+ case Type.AAAA:
+ createAAAAInfo(serviceRecord);
+ break;
+ case Type.TXT:
+ createTXTInfo(serviceRecord);
+ break;
+ case Type.CNAME:
+ createCNAMEInfo(serviceRecord);
+ break;
+ case Type.SRV:
+ createSRVInfo(serviceRecord);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type " + type);
+
+ }
+ }
+ }
+
+ /**
+ * Create an application TXT record descriptor.
+ *
+ * @param serviceRecord the service record.
+ * @throws Exception if there is an issue during descriptor creation.
+ */
+ protected void createTXTInfo(ServiceRecord serviceRecord) throws Exception {
+ List<Endpoint> endpoints = serviceRecord.external;
+ List<RecordDescriptor> recordDescriptors = new ArrayList<>();
+ TXTApplicationRecordDescriptor txtInfo;
+ for (Endpoint endpoint : endpoints) {
+ txtInfo = new TXTApplicationRecordDescriptor(
+ serviceRecord, endpoint);
+ recordDescriptors.add(txtInfo);
+ }
+ registerRecordDescriptor(Type.TXT, recordDescriptors);
+ }
+
+ /**
+ * Create an application SRV record descriptor.
+ *
+ * @param serviceRecord the service record.
+ * @throws Exception if there is an issue during descriptor creation.
+ */
+ protected void createSRVInfo(ServiceRecord serviceRecord) throws Exception {
+ List<Endpoint> endpoints = serviceRecord.external;
+ List<RecordDescriptor> recordDescriptors = new ArrayList<>();
+ SRVApplicationRecordDescriptor srvInfo;
+ for (Endpoint endpoint : endpoints) {
+ srvInfo = new SRVApplicationRecordDescriptor(
+ serviceRecord, endpoint);
+ recordDescriptors.add(srvInfo);
+ }
+ registerRecordDescriptor(Type.SRV, recordDescriptors);
+ }
+
+ /**
+ * Create an application CNAME record descriptor.
+ *
+ * @param serviceRecord the service record.
+ * @throws Exception if there is an issue during descriptor creation.
+ */
+ protected void createCNAMEInfo(ServiceRecord serviceRecord) throws Exception {
+ List<Endpoint> endpoints = serviceRecord.external;
+ List<RecordDescriptor> recordDescriptors = new ArrayList<>();
+ CNAMEApplicationRecordDescriptor cnameInfo;
+ for (Endpoint endpoint : endpoints) {
+ cnameInfo = new CNAMEApplicationRecordDescriptor(
+ serviceRecord, endpoint);
+ recordDescriptors.add(cnameInfo);
+ }
+ registerRecordDescriptor(Type.CNAME, recordDescriptors);
+ }
+
+ /**
+ * Create an application AAAA record descriptor.
+ *
+ * @param record the service record.
+ * @throws Exception if there is an issue during descriptor creation.
+ */
+ protected void createAAAAInfo(ServiceRecord record)
+ throws Exception {
+ AAAAApplicationRecordDescriptor
+ recordInfo = new AAAAApplicationRecordDescriptor(
+ getPath(), record);
+ registerRecordDescriptor(Type.AAAA, recordInfo);
+ }
+
+ /**
+ * Create an application A record descriptor.
+ *
+ * @param record the service record.
+ * @throws Exception if there is an issue during descriptor creation.
+ */
+ protected void createAInfo(ServiceRecord record) throws Exception {
+ AApplicationRecordDescriptor recordInfo = new AApplicationRecordDescriptor(
+ getPath(), record);
+ registerRecordDescriptor(Type.A, recordInfo);
+ }
+
+ /**
+ * Returns the record types associated with a container service record.
+ *
+ * @return the record type array
+ */
+ @Override public int[] getRecordTypes() {
+ return new int[] {Type.A, Type.AAAA, Type.CNAME, Type.SRV, Type.TXT};
+ }
+
+ /**
+ * An application TXT record descriptor.
+ */
+ class TXTApplicationRecordDescriptor
+ extends ApplicationRecordDescriptor<List<String>> {
+
+ /**
+ * Creates an application TXT record descriptor.
+ *
+ * @param record service record
+ * @throws Exception
+ */
+ public TXTApplicationRecordDescriptor(ServiceRecord record,
+ Endpoint endpoint) throws Exception {
+ super(record, endpoint);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ *
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord)
+ throws Exception {
+ if (getEndpoint() != null) {
+ this.setNames(new Name[] {getServiceName(), getEndpointName()});
+ this.setTarget(getTextRecords(getEndpoint()));
+ }
+ }
+
+ }
+
+ /**
+ * An application SRV record descriptor.
+ */
+ class SRVApplicationRecordDescriptor extends
+ ApplicationRecordDescriptor<RecordCreatorFactory.HostPortInfo> {
+
+ /**
+ * Creates an application SRV record descriptor.
+ *
+ * @param record service record
+ * @throws Exception
+ */
+ public SRVApplicationRecordDescriptor(ServiceRecord record,
+ Endpoint endpoint) throws Exception {
+ super(record, endpoint);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ *
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord)
+ throws Exception {
+ if (getEndpoint() != null) {
+ this.setNames(new Name[] {getServiceName(), getEndpointName()});
+ this.setTarget(new RecordCreatorFactory.HostPortInfo(
+ Name.fromString(getHost(getEndpoint()) + "."), getPort(
+ getEndpoint())));
+ }
+ }
+
+ }
+
+ /**
+ * An application CNAME record descriptor.
+ */
+ class CNAMEApplicationRecordDescriptor extends
+ ApplicationRecordDescriptor<Name> {
+
+ /**
+ * Creates an application CNAME record descriptor.
+ *
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public CNAMEApplicationRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(record);
+ }
+
+ /**
+ * Creates an application CNAME record descriptor. This descriptor is the
+ * source for API related CNAME records.
+ *
+ * @param record service record
+ * @param endpoint the API endpoint
+ * @throws Exception
+ */
+ public CNAMEApplicationRecordDescriptor(ServiceRecord record,
+ Endpoint endpoint) throws Exception {
+ super(record, endpoint);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ *
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord)
+ throws Exception {
+ if (getEndpoint() != null) {
+ this.setNames(new Name[] {getEndpointName()});
+ this.setTarget(getServiceName());
+ }
+ }
+
+ }
+
+ /**
+ * An application A record descriptor.
+ */
+ class AApplicationRecordDescriptor
+ extends ApplicationRecordDescriptor<InetAddress> {
+
+ /**
+ * Creates an application A record descriptor.
+ *
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public AApplicationRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(record);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ *
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord)
+ throws Exception {
+ this.setNames(new Name[] {getServiceName()});
+ List<Endpoint> endpoints = serviceRecord.external;
+ // TODO: do we need a "hostname" attribute for an application record or
+ // can we rely on the first endpoint record.
+ this.setTarget(InetAddress.getByName(
+ getHost(endpoints.get(0))));
+ }
+
+ }
+
+ /**
+ * An application AAAA record descriptor.
+ */
+ class AAAAApplicationRecordDescriptor extends AApplicationRecordDescriptor {
+
+ /**
+ * Creates an application AAAA record descriptor.
+ *
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public AAAAApplicationRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(path, record);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ *
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord)
+ throws Exception {
+ super.init(serviceRecord);
+ try {
+ this.setTarget(getIpv6Address(getTarget()));
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java
new file mode 100644
index 0000000..1289fb3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.ReverseMap;
+import org.xbill.DNS.TextParseException;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Provides common service record processing logic.
+ */
+public abstract class BaseServiceRecordProcessor
+ implements ServiceRecordProcessor {
+
+ private final ZoneSelector zoneSelctor;
+ private Map<Integer, List<RecordDescriptor>> typeToDescriptorMap =
+ new HashMap<>();
+ private String path;
+ private String domain;
+
+ private static final Pattern USER_NAME = Pattern.compile("/users/(\\w*)/?");
+ private static final String SLIDER_API_PREFIX =
+ "classpath:org.apache.slider.";
+ private static final String HTTP_API_TYPE = "http://";
+
+ /**
+ * Creates a service record processor.
+ *
+ * @param record the service record.
+ * @param path the node path for the record in the registry.
+ * @param domain the target DNS domain for the service record
+ * associated DNS records.
+ * @param zoneSelector A selector of the best zone for a given DNS name.
+ * @throws Exception if an issue is generated during instantiation.
+ */
+ public BaseServiceRecordProcessor(ServiceRecord record, String path,
+ String domain, ZoneSelector zoneSelector)
+ throws Exception {
+ this.setPath(path);
+ this.domain = domain;
+ this.zoneSelctor = zoneSelector;
+ initTypeToInfoMapping(record);
+ }
+
+ /**
+ * Return the username found in the ZK path.
+ *
+ * @param recPath the ZK recPath.
+ * @return the user name.
+ */
+ protected String getUsername(String recPath) {
+ String user = "anonymous";
+ Matcher matcher = USER_NAME.matcher(recPath);
+ if (matcher.find()) {
+ user = matcher.group(1);
+ }
+ return user;
+ }
+
+ /**
+ * Return the IPv6 mapped address for the provided IPv4 address. Utilized
+ * to create corresponding AAAA records.
+ *
+ * @param address the IPv4 address.
+ * @return the mapped IPv6 address.
+ * @throws UnknownHostException
+ */
+ static InetAddress getIpv6Address(InetAddress address)
+ throws UnknownHostException {
+ String[] octets = address.getHostAddress().split("\\.");
+ byte[] octetBytes = new byte[4];
+ for (int i = 0; i < 4; ++i) {
+ octetBytes[i] = (byte) Integer.parseInt(octets[i]);
+ }
+
+ byte[] ipv4asIpV6addr = new byte[16];
+ ipv4asIpV6addr[10] = (byte) 0xff;
+ ipv4asIpV6addr[11] = (byte) 0xff;
+ ipv4asIpV6addr[12] = octetBytes[0];
+ ipv4asIpV6addr[13] = octetBytes[1];
+ ipv4asIpV6addr[14] = octetBytes[2];
+ ipv4asIpV6addr[15] = octetBytes[3];
+
+ return Inet6Address.getByAddress(null, ipv4asIpV6addr, 0);
+ }
+
+ /**
+ * Reverse the string representation of the input IP address.
+ *
+ * @param ip the string representation of the IP address.
+ * @return the reversed IP address.
+ * @throws UnknownHostException if the ip is unknown.
+ */
+ protected Name reverseIP(String ip) throws UnknownHostException {
+ return ReverseMap.fromAddress(ip);
+ }
+
+ /**
+ * Manages the creation and registration of service record generated DNS
+ * records.
+ *
+ * @param command the DNS registration command object (e.g. add_record,
+ * remove record)
+ * @throws IOException if the creation or registration generates an issue.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void manageDNSRecords(RegistryDNS.RegistryCommand command)
+ throws IOException {
+ for (Map.Entry<Integer, List<RecordDescriptor>> entry :
+ typeToDescriptorMap.entrySet()) {
+ for (RecordDescriptor recordDescriptor : entry.getValue()) {
+ for (Name name : recordDescriptor.getNames()) {
+ RecordCreatorFactory.RecordCreator recordCreator =
+ RecordCreatorFactory.getRecordCreator(entry.getKey());
+ command.exec(zoneSelctor.findBestZone(name),
+ recordCreator.create(name, recordDescriptor.getTarget()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Add the DNS record descriptor object to the record type to descriptor
+ * mapping.
+ *
+ * @param type the DNS record type.
+ * @param recordDescriptor the DNS record descriptor
+ */
+ protected void registerRecordDescriptor(int type,
+ RecordDescriptor recordDescriptor) {
+ List<RecordDescriptor> infos = new ArrayList<>();
+ infos.add(recordDescriptor);
+ typeToDescriptorMap.put(type, infos);
+ }
+
+ /**
+ * Add the DNS record descriptor objects to the record type to descriptor
+ * mapping.
+ *
+ * @param type the DNS record type.
+ * @param recordDescriptors the DNS record descriptors
+ */
+ protected void registerRecordDescriptor(int type,
+ List<RecordDescriptor> recordDescriptors) {
+ typeToDescriptorMap.put(type, recordDescriptors);
+ }
+
+ /**
+ * Return the path associated with the record.
+ * @return the path.
+ */
+ protected String getPath() {
+ return path;
+ }
+
+ /**
+ * Set the path associated with the record.
+ * @param path the path.
+ */
+ protected void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * A descriptor container the information to be populated into a DNS record.
+ *
+ * @param <T> the DNS record type/class.
+ */
+ abstract class RecordDescriptor<T> {
+ private final ServiceRecord record;
+ private Name[] names;
+ private T target;
+
+ /**
+ * Creates a DNS record descriptor.
+ *
+ * @param record the associated service record.
+ */
+ public RecordDescriptor(ServiceRecord record) {
+ this.record = record;
+ }
+
+ /**
+ * Returns the DNS names associated with the record type and information.
+ *
+ * @return the array of names.
+ */
+ public Name[] getNames() {
+ return names;
+ }
+
+ /**
+ * Return the target object for the DNS record.
+ *
+ * @return the DNS record target.
+ */
+ public T getTarget() {
+ return target;
+ }
+
+ /**
+ * Initializes the names and information for this DNS record descriptor.
+ *
+ * @param serviceRecord the service record.
+ * @throws Exception
+ */
+ protected abstract void init(ServiceRecord serviceRecord) throws Exception;
+
+ /**
+ * Returns the service record.
+ * @return the service record.
+ */
+ public ServiceRecord getRecord() {
+ return record;
+ }
+
+ /**
+ * Sets the names associated with the record type and information.
+ * @param names the names.
+ */
+ public void setNames(Name[] names) {
+ this.names = names;
+ }
+
+ /**
+ * Sets the target object associated with the record.
+ * @param target the target.
+ */
+ public void setTarget(T target) {
+ this.target = target;
+ }
+ }
+
+ /**
+ * A container-based DNS record descriptor.
+ *
+ * @param <T> the DNS record type/class.
+ */
+ abstract class ContainerRecordDescriptor<T> extends RecordDescriptor<T> {
+
+ public ContainerRecordDescriptor(String path, ServiceRecord record)
+ throws Exception {
+ super(record);
+ init(record);
+ }
+
+ /**
+ * Returns the DNS name constructed from the YARN container ID.
+ *
+ * @return the container ID name.
+ * @throws TextParseException
+ */
+ protected Name getContainerIDName() throws TextParseException {
+ String containerID = RegistryPathUtils.lastPathEntry(getPath());
+ containerID = containerID.replace("container", "ctr");
+ return Name.fromString(String.format("%s.%s", containerID, domain));
+ }
+
+ /**
+ * Returns the DNS name constructed from the container role/component name.
+ *
+ * @return the DNS naem.
+ * @throws PathNotFoundException
+ * @throws TextParseException
+ */
+ protected Name getContainerName()
+ throws PathNotFoundException, TextParseException {
+ String service = RegistryPathUtils.lastPathEntry(
+ RegistryPathUtils.parentOf(RegistryPathUtils.parentOf(getPath())));
+ String description = getRecord().description.toLowerCase();
+ String user = getUsername(getPath());
+ return Name.fromString(MessageFormat.format("{0}.{1}.{2}.{3}",
+ description,
+ service,
+ user,
+ domain));
+ }
+
+ }
+
+ /**
+ * An application-based DNS record descriptor.
+ *
+ * @param <T> the DNS record type/class.
+ */
+ abstract class ApplicationRecordDescriptor<T> extends RecordDescriptor<T> {
+
+ private Endpoint srEndpoint;
+
+ /**
+ * Creates an application associated DNS record descriptor.
+ *
+ * @param record the service record.
+ * @throws Exception
+ */
+ public ApplicationRecordDescriptor(ServiceRecord record)
+ throws Exception {
+ this(record, null);
+ }
+
+ /**
+ * Creates an application associated DNS record descriptor. The endpoint
+ * is leverated to create an associated application API record.
+ *
+ * @param record the service record.
+ * @param endpoint an API endpoint.
+ * @throws Exception
+ */
+ public ApplicationRecordDescriptor(ServiceRecord record,
+ Endpoint endpoint) throws Exception {
+ super(record);
+ this.setEndpoint(endpoint);
+ init(record);
+ }
+
+ /**
+ * Get the service's DNS name for registration.
+ *
+ * @return the service DNS name.
+ * @throws TextParseException
+ */
+ protected Name getServiceName() throws TextParseException {
+ String user = getUsername(getPath());
+ String service =
+ String.format("%s.%s.%s",
+ RegistryPathUtils.lastPathEntry(getPath()),
+ user,
+ domain);
+ return Name.fromString(service);
+ }
+
+ /**
+ * Get the host from the provided endpoint record.
+ *
+ * @param endpoint the endpoint info.
+ * @return the host name.
+ */
+ protected String getHost(Endpoint endpoint) {
+ String host = null;
+ // assume one address for now
+ Map<String, String> address = endpoint.addresses.get(0);
+ if (endpoint.addressType.equals(AddressTypes.ADDRESS_HOSTNAME_AND_PORT)) {
+ host = address.get(AddressTypes.ADDRESS_HOSTNAME_FIELD);
+ } else if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
+ URI uri = URI.create(address.get("uri"));
+ host = uri.getHost();
+ }
+ return host;
+ }
+
+ /**
+ * Get the post from the provided endpoint record.
+ *
+ * @param endpoint the endpoint info.
+ * @return the port.
+ */
+ protected int getPort(Endpoint endpoint) {
+ int port = -1;
+ // assume one address for now
+ Map<String, String> address = endpoint.addresses.get(0);
+ if (endpoint.addressType.equals(AddressTypes.ADDRESS_HOSTNAME_AND_PORT)) {
+ port = Integer.parseInt(address.get(AddressTypes.ADDRESS_PORT_FIELD));
+ } else if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
+ URI uri = URI.create(address.get("uri"));
+ port = uri.getPort();
+ }
+ return port;
+ }
+
+ /**
+ * Get the list of strings that can be related in a TXT record for the given
+ * endpoint.
+ *
+ * @param endpoint the endpoint information.
+ * @return the list of strings relating endpoint info.
+ */
+ protected List<String> getTextRecords(Endpoint endpoint) {
+ Map<String, String> address = endpoint.addresses.get(0);
+ List<String> txtRecs = new ArrayList<String>();
+ txtRecs.add("api=" + getDNSApiFragment(endpoint.api));
+ if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
+ URI uri = URI.create(address.get("uri"));
+ txtRecs.add("path=" + uri.getPath());
+ }
+ return txtRecs;
+ }
+
+ /**
+ * Get an API name that is compatible with DNS standards (and shortened).
+ *
+ * @param api the api indicator.
+ * @return the shortened and compatible api name.
+ */
+ protected String getDNSApiFragment(String api) {
+ String dnsApi = null;
+ if (api.startsWith(SLIDER_API_PREFIX)) {
+ dnsApi = api.substring(SLIDER_API_PREFIX.length());
+ } else if (api.startsWith(HTTP_API_TYPE)) {
+ dnsApi = "http";
+ }
+ assert dnsApi != null;
+ dnsApi = dnsApi.replace('.', '-');
+ return dnsApi;
+ }
+
+ /**
+ * Return the DNS name associated with the API endpoint.
+ *
+ * @return the name.
+ * @throws TextParseException
+ */
+ protected Name getEndpointName() throws TextParseException {
+ return Name.fromString(String.format("%s-api.%s",
+ getDNSApiFragment(
+ getEndpoint().api),
+ getServiceName()));
+ }
+
+ /**
+ * Returns the endpoint.
+ * @return the endpoint.
+ */
+ public Endpoint getEndpoint() {
+ return srEndpoint;
+ }
+
+ /**
+ * Sets the endpoint.
+ * @param endpoint the endpoint.
+ */
+ public void setEndpoint(
+ Endpoint endpoint) {
+ this.srEndpoint = endpoint;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java
new file mode 100644
index 0000000..75873d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.TextParseException;
+import org.xbill.DNS.Type;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A processor for generating container DNS records from registry service
+ * records.
+ */
+public class ContainerServiceRecordProcessor extends
+ BaseServiceRecordProcessor {
+
+ /**
+ * Create a container service record processor.
+ * @param record the service record
+ * @param path the service record registry node path
+ * @param domain the DNS zone/domain name
+ * @param zoneSelector returns the zone associated with the provided name.
+ * @throws Exception if an issue is generated during instantiation.
+ */
+ public ContainerServiceRecordProcessor(
+ ServiceRecord record, String path, String domain,
+ ZoneSelector zoneSelector) throws Exception {
+ super(record, path, domain, zoneSelector);
+ }
+
+ /**
+ * Initializes the DNS record type to descriptor mapping based on the
+ * provided service record.
+ * @param serviceRecord the registry service record.
+ * @throws Exception if an issue arises.
+ */
+ @Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
+ throws Exception {
+ if (serviceRecord.get(YarnRegistryAttributes.YARN_IP) != null) {
+ for (int type : getRecordTypes()) {
+ switch (type) {
+ case Type.A:
+ createAInfo(serviceRecord);
+ break;
+ case Type.AAAA:
+ createAAAAInfo(serviceRecord);
+ break;
+ case Type.PTR:
+ createPTRInfo(serviceRecord);
+ break;
+ case Type.TXT:
+ createTXTInfo(serviceRecord);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type " + type);
+
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a container TXT record descriptor.
+ * @param serviceRecord the service record.
+ * @throws Exception if the descriptor creation yields an issue.
+ */
+ protected void createTXTInfo(ServiceRecord serviceRecord) throws Exception {
+ TXTContainerRecordDescriptor txtInfo =
+ new TXTContainerRecordDescriptor(getPath(), serviceRecord);
+ registerRecordDescriptor(Type.TXT, txtInfo);
+ }
+
+ /**
+ * Creates a container PTR record descriptor.
+ * @param record the service record.
+ * @throws Exception if the descriptor creation yields an issue.
+ */
+ protected void createPTRInfo(ServiceRecord record) throws Exception {
+ PTRContainerRecordDescriptor
+ ptrInfo = new PTRContainerRecordDescriptor(getPath(), record);
+ registerRecordDescriptor(Type.PTR, ptrInfo);
+ }
+
+ /**
+ * Creates a container AAAA (IPv6) record descriptor.
+ * @param record the service record
+ * @throws Exception if the descriptor creation yields an issue.
+ */
+ protected void createAAAAInfo(ServiceRecord record)
+ throws Exception {
+ AAAAContainerRecordDescriptor
+ recordInfo = new AAAAContainerRecordDescriptor(
+ getPath(), record);
+ registerRecordDescriptor(Type.AAAA, recordInfo);
+ }
+
+ /**
+ * Creates a container A (IPv4) record descriptor.
+ * @param record service record.
+ * @throws Exception if the descriptor creation yields an issue.
+ */
+ protected void createAInfo(ServiceRecord record) throws Exception {
+ AContainerRecordDescriptor recordInfo = new AContainerRecordDescriptor(
+ getPath(), record);
+ registerRecordDescriptor(Type.A, recordInfo);
+ }
+
+ /**
+ * Returns the record types associated with a container service record.
+ * @return the record type array
+ */
+ @Override public int[] getRecordTypes() {
+ return new int[] {Type.A, Type.AAAA, Type.PTR, Type.TXT};
+ }
+
+ /**
+ * A container TXT record descriptor.
+ */
+ class TXTContainerRecordDescriptor
+ extends ContainerRecordDescriptor<List<String>> {
+
+ /**
+ * Creates a container TXT record descriptor.
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public TXTContainerRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(path, record);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord) {
+ try {
+ this.setNames(new Name[] {getContainerIDName()});
+ } catch (TextParseException e) {
+ // log
+ }
+ List<String> txts = new ArrayList<>();
+ txts.add("id=" + serviceRecord.get(YarnRegistryAttributes.YARN_ID));
+ this.setTarget(txts);
+ }
+
+ }
+
+ /**
+ * A container PTR record descriptor.
+ */
+ class PTRContainerRecordDescriptor extends ContainerRecordDescriptor<Name> {
+
+ /**
+ * Creates a container PTR record descriptor.
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public PTRContainerRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(path, record);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord) {
+ String host = serviceRecord.get(YarnRegistryAttributes.YARN_HOSTNAME);
+ String ip = serviceRecord.get(YarnRegistryAttributes.YARN_IP);
+ Name reverseLookupName = null;
+ if (host != null && ip != null) {
+ try {
+ reverseLookupName = reverseIP(ip);
+ } catch (UnknownHostException e) {
+ //LOG
+ }
+ }
+ this.setNames(new Name[] {reverseLookupName});
+ try {
+ this.setTarget(getContainerIDName());
+ } catch (TextParseException e) {
+ //LOG
+ }
+ }
+
+ }
+
+
+ /**
+ * A container A record descriptor.
+ */
+ class AContainerRecordDescriptor
+ extends ContainerRecordDescriptor<InetAddress> {
+
+ /**
+ * Creates a container A record descriptor.
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public AContainerRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(path, record);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord) {
+ String ip = serviceRecord.get(YarnRegistryAttributes.YARN_IP);
+ if (ip == null) {
+ throw new IllegalArgumentException("No IP specified");
+ }
+ try {
+ this.setTarget(InetAddress.getByName(ip));
+ this.setNames(new Name[] {getContainerName(), getContainerIDName()});
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ }
+
+ /**
+ * A container AAAA record descriptor.
+ */
+ class AAAAContainerRecordDescriptor extends AContainerRecordDescriptor {
+
+ /**
+ * Creates a container AAAA record descriptor.
+ * @param path registry path for service record
+ * @param record service record
+ * @throws Exception
+ */
+ public AAAAContainerRecordDescriptor(String path,
+ ServiceRecord record) throws Exception {
+ super(path, record);
+ }
+
+ /**
+ * Initializes the descriptor parameters.
+ * @param serviceRecord the service record.
+ */
+ @Override protected void init(ServiceRecord serviceRecord) {
+ super.init(serviceRecord);
+ try {
+ this.setTarget(getIpv6Address(getTarget()));
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3031e921/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java
new file mode 100644
index 0000000..23f9501
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.xbill.DNS.AAAARecord;
+import org.xbill.DNS.ARecord;
+import org.xbill.DNS.CNAMERecord;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.PTRRecord;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SRVRecord;
+import org.xbill.DNS.TXTRecord;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import static org.xbill.DNS.Type.*;
+
+/**
+ * A factory for creating DNS records.
+ */
+public final class RecordCreatorFactory {
+ private static long ttl;
+
+ /**
+ * Private constructor.
+ */
+ private RecordCreatorFactory() {
+ }
+
+ /**
+ * Returns the DNS record creator for the provided type.
+ *
+ * @param type the DNS record type.
+ * @return the record creator.
+ */
+ static RecordCreator getRecordCreator(int type) {
+ switch (type) {
+ case A:
+ return new ARecordCreator();
+ case CNAME:
+ return new CNAMERecordCreator();
+ case TXT:
+ return new TXTRecordCreator();
+ case AAAA:
+ return new AAAARecordCreator();
+ case PTR:
+ return new PTRRecordCreator();
+ case SRV:
+ return new SRVRecordCreator();
+ default:
+ throw new IllegalArgumentException("No type " + type);
+
+ }
+ }
+
+ /**
+ * Set the TTL value for the records created by the factory.
+ *
+ * @param ttl the ttl value, in seconds.
+ */
+ public static void setTtl(long ttl) {
+ RecordCreatorFactory.ttl = ttl;
+ }
+
+ /**
+ * A DNS Record creator.
+ *
+ * @param <R> the record type
+ * @param <T> the record's target type
+ */
+ public interface RecordCreator<R extends Record, T> {
+ R create(Name name, T target);
+ }
+
+ /**
+ * An A Record creator.
+ */
+ static class ARecordCreator implements RecordCreator<ARecord, InetAddress> {
+ /**
+ * Creates an A record creator.
+ */
+ public ARecordCreator() {
+ }
+
+ /**
+ * Creates a DNS A record.
+ *
+ * @param name the record name.
+ * @param target the record target/value.
+ * @return an A record.
+ */
+ @Override public ARecord create(Name name, InetAddress target) {
+ return new ARecord(name, DClass.IN, ttl, target);
+ }
+ }
+
+ /**
+ * An AAAA Record creator.
+ */
+ static class AAAARecordCreator
+ implements RecordCreator<AAAARecord, InetAddress> {
+ /**
+ * Creates an AAAA record creator.
+ */
+ public AAAARecordCreator() {
+ }
+
+ /**
+ * Creates a DNS AAAA record.
+ *
+ * @param name the record name.
+ * @param target the record target/value.
+ * @return an A record.
+ */
+ @Override public AAAARecord create(Name name, InetAddress target) {
+ return new AAAARecord(name, DClass.IN, ttl, target);
+ }
+ }
+
+ static class CNAMERecordCreator implements RecordCreator<CNAMERecord, Name> {
+ /**
+ * Creates a CNAME record creator.
+ */
+ public CNAMERecordCreator() {
+ }
+
+ /**
+ * Creates a DNS CNAME record.
+ *
+ * @param name the record name.
+ * @param target the record target/value.
+ * @return an A record.
+ */
+ @Override public CNAMERecord create(Name name, Name target) {
+ return new CNAMERecord(name, DClass.IN, ttl, target);
+ }
+ }
+
+ /**
+ * A TXT Record creator.
+ */
+ static class TXTRecordCreator
+ implements RecordCreator<TXTRecord, List<String>> {
+ /**
+ * Creates a TXT record creator.
+ */
+ public TXTRecordCreator() {
+ }
+
+ /**
+ * Creates a DNS TXT record.
+ *
+ * @param name the record name.
+ * @param target the record target/value.
+ * @return an A record.
+ */
+ @Override public TXTRecord create(Name name, List<String> target) {
+ return new TXTRecord(name, DClass.IN, ttl, target);
+ }
+ }
+
+ /**
+ * A PTR Record creator.
+ */
+ static class PTRRecordCreator implements RecordCreator<PTRRecord, Name> {
+ /**
+ * Creates a PTR record creator.
+ */
+ public PTRRecordCreator() {
+ }
+
+ /**
+ * Creates a DNS PTR record.
+ *
+ * @param name the record name.
+ * @param target the record target/value.
+ * @return an A record.
+ */
+ @Override public PTRRecord create(Name name, Name target) {
+ return new PTRRecord(name, DClass.IN, ttl, target);
+ }
+ }
+
+ /**
+ * A SRV Record creator.
+ */
+ static class SRVRecordCreator
+ implements RecordCreator<SRVRecord, HostPortInfo> {
+ /**
+ * Creates a SRV record creator.
+ */
+ public SRVRecordCreator() {
+ }
+
+ /**
+ * Creates a DNS SRV record.
+ *
+ * @param name the record name.
+ * @param target the record target/value.
+ * @return an A record.
+ */
+ @Override public SRVRecord create(Name name, HostPortInfo target) {
+ return new SRVRecord(name, DClass.IN, ttl, 1, 1, target.getPort(),
+ target.getHost());
+ }
+ }
+
+ /**
+ * An object for storing the host and port info used to generate SRV records.
+ */
+ public static class HostPortInfo {
+ private Name host;
+ private int port;
+
+ /**
+ * Creates an object with a host and port pair.
+ *
+ * @param host the hostname/ip
+ * @param port the port value
+ */
+ public HostPortInfo(Name host, int port) {
+ this.setHost(host);
+ this.setPort(port);
+ }
+
+ /**
+ * Return the host name.
+ * @return the host name.
+ */
+ Name getHost() {
+ return host;
+ }
+
+ /**
+ * Set the host name.
+ * @param host the host name.
+ */
+ void setHost(Name host) {
+ this.host = host;
+ }
+
+ /**
+ * Get the port.
+ * @return the port.
+ */
+ int getPort() {
+ return port;
+ }
+
+ /**
+ * Set the port.
+ * @param port the port.
+ */
+ void setPort(int port) {
+ this.port = port;
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org