You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/12 18:37:02 UTC

[1/3] hadoop git commit: YARN-5218. Initial core change for DNS for YARN. Contributed by Jonathan Maron

Repository: hadoop
Updated Branches:
  refs/heads/YARN-4757 be34e85e6 -> 8f3c3a22a


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java
new file mode 100644
index 0000000..37f0d23
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.xbill.DNS.AAAARecord;
+import org.xbill.DNS.ARecord;
+import org.xbill.DNS.CNAMERecord;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.DNSKEYRecord;
+import org.xbill.DNS.DNSSEC;
+import org.xbill.DNS.Flags;
+import org.xbill.DNS.Message;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.OPTRecord;
+import org.xbill.DNS.PTRRecord;
+import org.xbill.DNS.RRSIGRecord;
+import org.xbill.DNS.RRset;
+import org.xbill.DNS.Rcode;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SRVRecord;
+import org.xbill.DNS.Section;
+import org.xbill.DNS.Type;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.spec.RSAPrivateKeySpec;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_MASK;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_SUBNET;
+
+/**
+ *
+ */
+public class TestRegistryDNS extends Assert {
+
+  private RegistryDNS registryDNS;
+  private RegistryUtils.ServiceRecordMarshal marshal;
+
+  private static final String APPLICATION_RECORD = "{\n"
+      + "  \"type\" : \"JSONServiceRecord\",\n"
+      + "  \"description\" : \"Slider Application Master\",\n"
+      + "  \"external\" : [ {\n"
+      + "    \"api\" : \"classpath:org.apache.slider.appmaster.ipc\",\n"
+      + "    \"addressType\" : \"host/port\",\n"
+      + "    \"protocolType\" : \"hadoop/IPC\",\n"
+      + "    \"addresses\" : [ {\n"
+      + "      \"host\" : \"192.168.1.5\",\n"
+      + "      \"port\" : \"1026\"\n"
+      + "    } ]\n"
+      + "  }, {\n"
+      + "    \"api\" : \"http://\",\n"
+      + "    \"addressType\" : \"uri\",\n"
+      + "    \"protocolType\" : \"webui\",\n"
+      + "    \"addresses\" : [ {\n"
+      + "      \"uri\" : \"http://192.168.1.5:1027\"\n"
+      + "    } ]\n"
+      + "  }, {\n"
+      + "    \"api\" : \"classpath:org.apache.slider.management\",\n"
+      + "    \"addressType\" : \"uri\",\n"
+      + "    \"protocolType\" : \"REST\",\n"
+      + "    \"addresses\" : [ {\n"
+      + "      \"uri\" : \"http://192.168.1.5:1027/ws/v1/slider/mgmt\"\n"
+      + "    } ]\n"
+      + "  } ],\n"
+      + "  \"internal\" : [ {\n"
+      + "    \"api\" : \"classpath:org.apache.slider.agents.secure\",\n"
+      + "    \"addressType\" : \"uri\",\n"
+      + "    \"protocolType\" : \"REST\",\n"
+      + "    \"addresses\" : [ {\n"
+      + "      \"uri\" : \"https://192.168.1.5:47700/ws/v1/slider/agents\"\n"
+      + "    } ]\n"
+      + "  }, {\n"
+      + "    \"api\" : \"classpath:org.apache.slider.agents.oneway\",\n"
+      + "    \"addressType\" : \"uri\",\n"
+      + "    \"protocolType\" : \"REST\",\n"
+      + "    \"addresses\" : [ {\n"
+      + "      \"uri\" : \"https://192.168.1.5:35531/ws/v1/slider/agents\"\n"
+      + "    } ]\n"
+      + "  } ],\n"
+      + "  \"yarn:id\" : \"application_1451931954322_0016\",\n"
+      + "  \"yarn:persistence\" : \"application\"\n"
+      + "}\n";
+  static final String CONTAINER_RECORD = "{\n"
+      + "  \"type\" : \"JSONServiceRecord\",\n"
+      + "  \"description\" : \"YCLOUD\",\n"
+      + "  \"external\" : [ ],\n"
+      + "  \"internal\" : [ ],\n"
+      + "  \"yarn:id\" : \"container_e50_1451931954322_0016_01_000002\",\n"
+      + "  \"yarn:persistence\" : \"container\",\n"
+      + "  \"yarn:ip\" : \"172.17.0.19\",\n"
+      + "  \"yarn:hostname\" : \"0a134d6329ba\"\n"
+      + "}\n";
+
+  private static final String CONTAINER_RECORD_NO_IP = "{\n"
+      + "  \"type\" : \"JSONServiceRecord\",\n"
+      + "  \"description\" : \"YCLOUD\",\n"
+      + "  \"external\" : [ ],\n"
+      + "  \"internal\" : [ ],\n"
+      + "  \"yarn:id\" : \"container_e50_1451931954322_0016_01_000002\",\n"
+      + "  \"yarn:persistence\" : \"container\"\n"
+      + "}\n";
+
+  @Before
+  public void initialize() throws Exception {
+    setRegistryDNS(new RegistryDNS("TestRegistry"));
+    Configuration conf = createConfiguration();
+
+    getRegistryDNS().setDomainName(conf);
+    getRegistryDNS().initializeZones(conf);
+
+    setMarshal(new RegistryUtils.ServiceRecordMarshal());
+  }
+
+  protected Configuration createConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(RegistryConstants.KEY_DNS_DOMAIN, "hwx.test");
+    conf.set(RegistryConstants.KEY_DNS_ZONE_SUBNET, "172.17.0");
+    conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
+    return conf;
+  }
+
+  protected boolean isSecure() {
+    return false;
+  }
+
+  @After
+  public void closeRegistry() throws Exception {
+    getRegistryDNS().stopExecutor();
+  }
+
+  @Test
+  public void testAppRegistration() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        APPLICATION_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/", record);
+
+    // start assessing whether correct records are available
+    Record[] recs = assertDNSQuery("test1.root.hwx.test.");
+    assertEquals("wrong result", "192.168.1.5",
+        ((ARecord) recs[0]).getAddress().getHostAddress());
+
+    recs = assertDNSQuery("management-api.test1.root.hwx.test.", 2);
+    assertEquals("wrong target name", "test1.root.hwx.test.",
+        ((CNAMERecord) recs[0]).getTarget().toString());
+    assertTrue("not an ARecord", recs[isSecure() ? 2 : 1] instanceof ARecord);
+
+    recs = assertDNSQuery("appmaster-ipc-api.test1.root.hwx.test.",
+        Type.SRV, 1);
+    assertTrue("not an SRV record", recs[0] instanceof SRVRecord);
+    assertEquals("wrong port", 1026, ((SRVRecord) recs[0]).getPort());
+
+    recs = assertDNSQuery("appmaster-ipc-api.test1.root.hwx.test.", 2);
+    assertEquals("wrong target name", "test1.root.hwx.test.",
+        ((CNAMERecord) recs[0]).getTarget().toString());
+    assertTrue("not an ARecord", recs[isSecure() ? 2 : 1] instanceof ARecord);
+
+    recs = assertDNSQuery("http-api.test1.root.hwx.test.", 2);
+    assertEquals("wrong target name", "test1.root.hwx.test.",
+        ((CNAMERecord) recs[0]).getTarget().toString());
+    assertTrue("not an ARecord", recs[isSecure() ? 2 : 1] instanceof ARecord);
+
+    recs = assertDNSQuery("http-api.test1.root.hwx.test.", Type.SRV,
+        1);
+    assertTrue("not an SRV record", recs[0] instanceof SRVRecord);
+    assertEquals("wrong port", 1027, ((SRVRecord) recs[0]).getPort());
+
+    assertDNSQuery("test1.root.hwx.test.", Type.TXT, 3);
+    assertDNSQuery("appmaster-ipc-api.test1.root.hwx.test.", Type.TXT, 1);
+    assertDNSQuery("http-api.test1.root.hwx.test.", Type.TXT, 1);
+    assertDNSQuery("management-api.test1.root.hwx.test.", Type.TXT, 1);
+  }
+
+  @Test
+  public void testContainerRegistration() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Record[] recs =
+        assertDNSQuery("ctr-e50-1451931954322-0016-01-000002.hwx.test.");
+    assertEquals("wrong result", "172.17.0.19",
+        ((ARecord) recs[0]).getAddress().getHostAddress());
+
+    recs = assertDNSQuery("ycloud.test1.root.hwx.test.", 1);
+    assertTrue("not an ARecord", recs[0] instanceof ARecord);
+  }
+
+  @Test
+  public void testRecordTTL() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Record[] recs = assertDNSQuery(
+        "ctr-e50-1451931954322-0016-01-000002.hwx.test.");
+    assertEquals("wrong result", "172.17.0.19",
+        ((ARecord) recs[0]).getAddress().getHostAddress());
+    assertEquals("wrong ttl", 30L, recs[0].getTTL());
+
+    recs = assertDNSQuery("ycloud.test1.root.hwx.test.", 1);
+    assertTrue("not an ARecord", recs[0] instanceof ARecord);
+
+    assertEquals("wrong ttl", 30L, recs[0].getTTL());
+  }
+
+  @Test
+  public void testReverseLookup() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Record[] recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
+    assertEquals("wrong result",
+        "ctr-e50-1451931954322-0016-01-000002.hwx.test.",
+        ((PTRRecord) recs[0]).getTarget().toString());
+  }
+
+  @Test
+  public void testReverseLookupInLargeNetwork() throws Exception {
+    setRegistryDNS(new RegistryDNS("TestRegistry"));
+    Configuration conf = createConfiguration();
+    conf.set(RegistryConstants.KEY_DNS_DOMAIN, "hwx.test");
+    conf.set(KEY_DNS_ZONE_SUBNET, "172.17.0.0");
+    conf.set(KEY_DNS_ZONE_MASK, "255.255.224.0");
+    conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
+
+    getRegistryDNS().setDomainName(conf);
+    getRegistryDNS().initializeZones(conf);
+
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Record[] recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
+    assertEquals("wrong result",
+        "ctr-e50-1451931954322-0016-01-000002.hwx.test.",
+        ((PTRRecord) recs[0]).getTarget().toString());
+  }
+
+  @Test
+  public void testMissingReverseLookup() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Name name = Name.fromString("19.1.17.172.in-addr.arpa.");
+    Record question = Record.newRecord(name, Type.PTR, DClass.IN);
+    Message query = Message.newQuery(question);
+    OPTRecord optRecord = new OPTRecord(4096, 0, 0, Flags.DO, null);
+    query.addRecord(optRecord, Section.ADDITIONAL);
+    byte[] responseBytes = getRegistryDNS().generateReply(query, null);
+    Message response = new Message(responseBytes);
+    assertEquals("No answer should be returned", Rcode.NOTAUTH,
+        response.getRcode());
+  }
+
+  @Test
+  public void testNoContainerIP() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD_NO_IP.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Name name =
+        Name.fromString("ctr-e50-1451931954322-0016-01-000002.hwx.test.");
+    Record question = Record.newRecord(name, Type.A, DClass.IN);
+    Message query = Message.newQuery(question);
+
+    byte[] responseBytes = getRegistryDNS().generateReply(query, null);
+    Message response = new Message(responseBytes);
+    assertEquals("wrong status", Rcode.NXDOMAIN, response.getRcode());
+  }
+
+  private Record[] assertDNSQuery(String lookup) throws IOException {
+    return assertDNSQuery(lookup, Type.A, 1);
+  }
+
+  private Record[] assertDNSQuery(String lookup, int numRecs)
+      throws IOException {
+    return assertDNSQuery(lookup, Type.A, numRecs);
+  }
+
+  Record[] assertDNSQuery(String lookup, int type, int numRecs)
+      throws IOException {
+    Name name = Name.fromString(lookup);
+    Record question = Record.newRecord(name, type, DClass.IN);
+    Message query = Message.newQuery(question);
+    OPTRecord optRecord = new OPTRecord(4096, 0, 0, Flags.DO, null);
+    query.addRecord(optRecord, Section.ADDITIONAL);
+    byte[] responseBytes = getRegistryDNS().generateReply(query, null);
+    Message response = new Message(responseBytes);
+    assertEquals("not successful", Rcode.NOERROR, response.getRcode());
+    assertNotNull("Null response", response);
+    assertEquals("Questions do not match", query.getQuestion(),
+        response.getQuestion());
+    Record[] recs = response.getSectionArray(Section.ANSWER);
+    assertEquals("wrong number of answer records",
+        isSecure() ? numRecs * 2 : numRecs, recs.length);
+    if (isSecure()) {
+      boolean signed = false;
+      for (Record record : recs) {
+        signed = record.getType() == Type.RRSIG;
+        if (signed) {
+          break;
+        }
+      }
+      assertTrue("No signatures found", signed);
+    }
+    return recs;
+  }
+
+  @Test
+  public void testDNSKEYRecord() throws Exception {
+    String publicK =
+        "AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+            + "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+            + "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+            + "l9Ozs5bV";
+    //    byte[] publicBytes = Base64.decodeBase64(publicK);
+    //    X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicBytes);
+    //    KeyFactory keyFactory = KeyFactory.getInstance("RSA");
+    //    PublicKey pubKey = keyFactory.generatePublic(keySpec);
+    DNSKEYRecord dnskeyRecord =
+        new DNSKEYRecord(Name.fromString("hwxstg.site."), DClass.IN, 0,
+            DNSKEYRecord.Flags.ZONE_KEY,
+            DNSKEYRecord.Protocol.DNSSEC,
+            DNSSEC.Algorithm.RSASHA256,
+            Base64.decodeBase64(publicK.getBytes()));
+    assertNotNull(dnskeyRecord);
+    RSAPrivateKeySpec privateSpec = new RSAPrivateKeySpec(new BigInteger(1,
+        Base64.decodeBase64(
+            "7Ul6/QDPWSGVAK9/Se53X8I0dDDA8S7wE1yFm2F0PEo9Wfb3KsMIegBaPCIaw5LDd"
+                + "LMg+trBJsfPImyOfSgsGEasfpB50UafJ2jGM2zDeb9IKY6NH9rssYEAwMUq"
+                + "oWKiLiA7K43rqy8F5j7/m7Dvb7R6L0BDbSCp/qqX07OzltU=")),
+        new BigInteger(1, Base64.decodeBase64(
+            "MgbQ6DBYhskeufNGGdct0cGG/4wb0X183ggenwCv2dopDyOTPq+5xMb4Pz9Ndzgk/"
+                + "yCY7mpaWIu9rttGOzrR+LBRR30VobPpMK1bMnzu2C0x08oYAguVwZB79DLC"
+                + "705qmZpiaaFB+LnhG7VtpPiOBm3UzZxdrBfeq/qaKrXid60=")));
+    KeyFactory factory = KeyFactory.getInstance("RSA");
+    PrivateKey priv = factory.generatePrivate(privateSpec);
+
+    ARecord aRecord = new ARecord(Name.fromString("some.test."), DClass.IN, 0,
+        InetAddress.getByName("192.168.0.1"));
+    Calendar cal = Calendar.getInstance();
+    Date inception = cal.getTime();
+    cal.add(Calendar.YEAR, 1);
+    Date expiration = cal.getTime();
+    RRset rrset = new RRset(aRecord);
+    RRSIGRecord rrsigRecord = DNSSEC.sign(rrset,
+        dnskeyRecord,
+        priv,
+        inception,
+        expiration);
+    DNSSEC.verify(rrset, rrsigRecord, dnskeyRecord);
+
+  }
+
+  @Test
+  public void testIpv4toIpv6() throws Exception {
+    InetAddress address =
+        BaseServiceRecordProcessor
+            .getIpv6Address(InetAddress.getByName("172.17.0.19"));
+    assertTrue("not an ipv6 address", address instanceof Inet6Address);
+    assertEquals("wrong IP", "172.17.0.19",
+        InetAddress.getByAddress(address.getAddress()).getHostAddress());
+  }
+
+  @Test
+  public void testAAAALookup() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Record[] recs = assertDNSQuery(
+        "ctr-e50-1451931954322-0016-01-000002.hwx.test.", Type.AAAA, 1);
+    assertEquals("wrong result", "172.17.0.19",
+        ((AAAARecord) recs[0]).getAddress().getHostAddress());
+
+    recs = assertDNSQuery("ycloud.test1.root.hwx.test.", Type.AAAA, 1);
+    assertTrue("not an ARecord", recs[0] instanceof AAAARecord);
+  }
+
+  @Test
+  public void testNegativeLookup() throws Exception {
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Name name = Name.fromString("missing.hwx.test.");
+    Record question = Record.newRecord(name, Type.A, DClass.IN);
+    Message query = Message.newQuery(question);
+
+    byte[] responseBytes = getRegistryDNS().generateReply(query, null);
+    Message response = new Message(responseBytes);
+    assertEquals("not successful", Rcode.NXDOMAIN, response.getRcode());
+    assertNotNull("Null response", response);
+    assertEquals("Questions do not match", query.getQuestion(),
+        response.getQuestion());
+    Record[] sectionArray = response.getSectionArray(Section.AUTHORITY);
+    assertEquals("Wrong number of recs in AUTHORITY", isSecure() ? 2 : 1,
+        sectionArray.length);
+    boolean soaFound = false;
+    for (Record rec : sectionArray) {
+      soaFound = rec.getType() == Type.SOA;
+      if (soaFound) {
+        break;
+      }
+    }
+    assertTrue("wrong record type",
+        soaFound);
+
+  }
+
+  @Test
+  public void testReadMasterFile() throws Exception {
+    setRegistryDNS(new RegistryDNS("TestRegistry"));
+    Configuration conf = new Configuration();
+    conf.set(RegistryConstants.KEY_DNS_DOMAIN, "hwx.test");
+    conf.set(RegistryConstants.KEY_DNS_ZONE_SUBNET, "172.17.0");
+    conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
+    conf.set(RegistryConstants.KEY_DNS_ZONES_DIR,
+        getClass().getResource("/").getFile());
+    if (isSecure()) {
+      conf.setBoolean(RegistryConstants.KEY_DNSSEC_ENABLED, true);
+      conf.set(RegistryConstants.KEY_DNSSEC_PUBLIC_KEY,
+          "AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+              + "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+              + "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+              + "l9Ozs5bV");
+      conf.set(RegistryConstants.KEY_DNSSEC_PRIVATE_KEY_FILE,
+          getClass().getResource("/test.private").getFile());
+    }
+
+    getRegistryDNS().setDomainName(conf);
+    getRegistryDNS().initializeZones(conf);
+
+    ServiceRecord record = getMarshal().fromBytes("somepath",
+        CONTAINER_RECORD.getBytes());
+    getRegistryDNS().register(
+        "/registry/users/root/services/org-apache-slider/test1/components/"
+            + "container-e50-1451931954322-0016-01-000002",
+        record);
+
+    // start assessing whether correct records are available
+    Record[] recs =
+        assertDNSQuery("ctr-e50-1451931954322-0016-01-000002.hwx.test.");
+    assertEquals("wrong result", "172.17.0.19",
+        ((ARecord) recs[0]).getAddress().getHostAddress());
+
+    recs = assertDNSQuery("ycloud.test1.root.hwx.test.", 1);
+    assertTrue("not an ARecord", recs[0] instanceof ARecord);
+
+    // lookup dyanmic reverse records
+    recs = assertDNSQuery("19.0.17.172.in-addr.arpa.", Type.PTR, 1);
+    assertEquals("wrong result",
+        "ctr-e50-1451931954322-0016-01-000002.hwx.test.",
+        ((PTRRecord) recs[0]).getTarget().toString());
+
+    // now lookup static reverse records
+    Name name = Name.fromString("5.0.17.172.in-addr.arpa.");
+    Record question = Record.newRecord(name, Type.PTR, DClass.IN);
+    Message query = Message.newQuery(question);
+    OPTRecord optRecord = new OPTRecord(4096, 0, 0, Flags.DO, null);
+    query.addRecord(optRecord, Section.ADDITIONAL);
+    byte[] responseBytes = getRegistryDNS().generateReply(query, null);
+    Message response = new Message(responseBytes);
+    recs = response.getSectionArray(Section.ANSWER);
+    assertEquals("wrong result", "cn005.hwx.test.",
+        ((PTRRecord) recs[0]).getTarget().toString());
+  }
+
+  @Test
+  public void testReverseZoneNames() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(KEY_DNS_ZONE_SUBNET, "172.26.32.0");
+    conf.set(KEY_DNS_ZONE_MASK, "255.255.224.0");
+
+    Name name = getRegistryDNS().getReverseZoneName(conf);
+    assertEquals("wrong name", "26.172.in-addr.arpa.", name.toString());
+  }
+
+  public RegistryDNS getRegistryDNS() {
+    return registryDNS;
+  }
+
+  public void setRegistryDNS(
+      RegistryDNS registryDNS) {
+    this.registryDNS = registryDNS;
+  }
+
+  public RegistryUtils.ServiceRecordMarshal getMarshal() {
+    return marshal;
+  }
+
+  public void setMarshal(
+      RegistryUtils.ServiceRecordMarshal marshal) {
+    this.marshal = marshal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestSecureRegistryDNS.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestSecureRegistryDNS.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestSecureRegistryDNS.java
new file mode 100644
index 0000000..ded63bd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestSecureRegistryDNS.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+
+/**
+ *
+ */
+public class TestSecureRegistryDNS extends TestRegistryDNS {
+  @Override protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(RegistryConstants.KEY_DNSSEC_ENABLED, true);
+    conf.set(RegistryConstants.KEY_DNSSEC_PUBLIC_KEY,
+        "AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+            + "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+            + "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+            + "l9Ozs5bV");
+    conf.set(RegistryConstants.KEY_DNSSEC_PRIVATE_KEY_FILE,
+        getClass().getResource("/test.private").getFile());
+
+    return conf;
+  }
+
+  @Override protected boolean isSecure() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/0.17.172.in-addr.arpa.zone
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/0.17.172.in-addr.arpa.zone b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/0.17.172.in-addr.arpa.zone
new file mode 100644
index 0000000..0165f0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/0.17.172.in-addr.arpa.zone
@@ -0,0 +1,36 @@
+;
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;      http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+;
+;
+$ORIGIN .
+$TTL 1800 ; 30 minutes
+0.17.172.in-addr.arpa IN SOA ns.hwhq.hortonworks.com. it.hortonworks.com. (
+	2015081000 ; serial
+	10800      ; refresh (3 hours)
+	900        ; retry (15 minutes)
+	1814400    ; expire (3 weeks)
+	10800      ; minimum (3 hours)
+)
+	NS	ns.hwhq.hortonworks.com.
+	NS	ns2.hwhq.hortonworks.com.
+
+$ORIGIN 0.17.172.in-addr.arpa.
+5 	PTR 	cn005.hwx.test.
+6 	PTR 	cn006.hwx.test.
+7 	PTR 	cn007.hwx.test.
+8	PTR 	cn008.hwx.test.
+9 	PTR 	cn009.hwx.test.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/test.private
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/test.private b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/test.private
new file mode 100644
index 0000000..5f0da9d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/resources/test.private
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+Private-key-format: v1.3
+Algorithm: 8 (RSASHA256)
+Modulus: 7Ul6/QDPWSGVAK9/Se53X8I0dDDA8S7wE1yFm2F0PEo9Wfb3KsMIegBaPCIaw5LDdLMg+trBJsfPImyOfSgsGEasfpB50UafJ2jGM2zDeb9IKY6NH9rssYEAwMUqoWKiLiA7K43rqy8F5j7/m7Dvb7R6L0BDbSCp/qqX07OzltU=
+PublicExponent: AQAB
+PrivateExponent: MgbQ6DBYhskeufNGGdct0cGG/4wb0X183ggenwCv2dopDyOTPq+5xMb4Pz9Ndzgk/yCY7mpaWIu9rttGOzrR+LBRR30VobPpMK1bMnzu2C0x08oYAguVwZB79DLC705qmZpiaaFB+LnhG7VtpPiOBm3UzZxdrBfeq/qaKrXid60=
+Prime1: /HFdjI4cRuJBjK9IGWWmmVZWwaFsQYO9GHLCDwjm691GxaDpXuMdPd0uH9EqQvskyF8JPmzQXI43swyUFjizow==
+Prime2: 8KFxkWEHlhgB2GLi8tk39TKY5vmFUvh4FO28COl1N/rWjKVpfM1p6HQ6YavoGNZQmDBazv4WOZRqSQukHApzJw==
+Exponent1: alX+h/RcqOcpoW88OaZ99N1PkiTDCx3JC4FbiSXAz93Xr+vGIfgdGzAN+80JtklABz8xD6CabEJj6AIGZw3fbQ==
+Exponent2: vvPusqZkJcjBVh0K6hpUXKEdU1W5ZmFEsZ8Cs7PH0Hee4Je3QVGk9NGfLrkDgwo3hL4CofZiXqkXOwYg4husyw==
+Coefficient: omxpbNU6u/swbnkTC6MicaDqbJP7ETnCCJ1iN2+HZO/AlQCFlqVzLwGZmvGMAGA9ZWF+YpqpPhvzi4bWmi5XrQ==
+Created: 20160119155251
+Publish: 20160119155251
+Activate: 20160119155251
+


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: YARN-5218. Initial core change for DNS for YARN. Contributed by Jonathan Maron

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
new file mode 100644
index 0000000..52b3c37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
@@ -0,0 +1,1534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.net.util.Base64;
+import org.apache.commons.net.util.SubnetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.api.DNSOperations;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xbill.DNS.CNAMERecord;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.DNSKEYRecord;
+import org.xbill.DNS.DNSSEC;
+import org.xbill.DNS.DSRecord;
+import org.xbill.DNS.ExtendedFlags;
+import org.xbill.DNS.Flags;
+import org.xbill.DNS.Header;
+import org.xbill.DNS.Message;
+import org.xbill.DNS.NSRecord;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.NameTooLongException;
+import org.xbill.DNS.OPTRecord;
+import org.xbill.DNS.Opcode;
+import org.xbill.DNS.RRSIGRecord;
+import org.xbill.DNS.RRset;
+import org.xbill.DNS.Rcode;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SOARecord;
+import org.xbill.DNS.Section;
+import org.xbill.DNS.SetResponse;
+import org.xbill.DNS.TSIG;
+import org.xbill.DNS.TSIGRecord;
+import org.xbill.DNS.TextParseException;
+import org.xbill.DNS.Type;
+import org.xbill.DNS.Zone;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.security.KeyFactory;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.RSAPrivateKeySpec;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
+
+/**
+ * A DNS service reflecting the state of the YARN registry.  Records are created
+ * based on service records available in the YARN ZK-based registry.
+ */
+public class RegistryDNS extends AbstractService implements DNSOperations,
+    ZoneSelector {
+
+  public static final String CONTAINER = "container";
+
+  static final int FLAG_DNSSECOK = 1;
+  static final int FLAG_SIGONLY = 2;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistryDNS.class);
+  public static final String IN_ADDR_ARPA = "in-addr.arpa.";
+  public static final String ZONE_SUFFIX = ".zone";
+
+  private ExecutorService executor;
+  private ReentrantReadWriteLock zoneLock = new ReentrantReadWriteLock();
+  private CloseableLock readLock = new CloseableLock(zoneLock.readLock());
+  private CloseableLock writeLock = new CloseableLock(zoneLock.writeLock());
+  private String domainName;
+  private long ttl = 0L;
+
+  private static final Pattern USER_NAME = Pattern.compile("/users/(\\w*)/?");
+  private Boolean dnssecEnabled;
+  private PrivateKey privateKey;
+
+  private ConcurrentMap<Name, DNSKEYRecord> dnsKeyRecs =
+      new ConcurrentHashMap<>();
+  private ConcurrentMap<Name, Zone> zones = new ConcurrentHashMap<>();
+  private Name bindHost;
+
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public RegistryDNS(String name) {
+    super(name);
+    executor = HadoopExecutors.newCachedThreadPool(
+        new ThreadFactory() {
+          private AtomicInteger counter = new AtomicInteger(1);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            return new Thread(r,
+                "RegistryDNS "
+                    + counter.getAndIncrement());
+          }
+        });
+  }
+
+  /**
+   * Initializes the registry.
+   *
+   * @param conf the hadoop configuration
+   * @throws Exception if there are tcp/udp issues
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+
+    // create the zone.  for now create a "dummy" SOA record
+    try {
+      setDomainName(conf);
+
+      int port = initializeZones(conf);
+
+      InetAddress addr = InetAddress.getLocalHost();
+
+      String bindAddress = conf.get(KEY_DNS_BIND_ADDRESS);
+      if (bindAddress != null) {
+        addr = InetAddress.getByName(bindAddress);
+      }
+      addNIOUDP(addr, port);
+      addNIOTCP(addr, port);
+
+    } catch (IOException e) {
+      LOG.error("Error initializing Registry DNS Server", e);
+      throw e;
+    }
+  }
+
+  /**
+   * Initializes the registry based on available parameters in the hadoop
+   * configuration.
+   *
+   * @param conf the hadoop configuration
+   * @return the listener port
+   * @throws IOException
+   */
+  int initializeZones(Configuration conf) throws IOException {
+    int port = conf.getInt(KEY_DNS_PORT, DEFAULT_DNS_PORT);
+    ttl = conf.getTimeDuration(KEY_DNS_TTL, 1L, TimeUnit.SECONDS);
+    RecordCreatorFactory.setTtl(ttl);
+
+    setDNSSECEnabled(conf);
+
+    initializeZonesFromFiles(conf);
+
+    Zone registryZone = configureZone(Name.fromString(domainName), conf);
+    zones.put(registryZone.getOrigin(), registryZone);
+
+    initializeReverseLookupZone(conf);
+
+    return port;
+  }
+
+  /**
+   * Signs zone records if necessary (DNSSEC enabled).  Zones may not have
+   * their NS and SOA records signed if they were initialized from master files.
+   */
+  private void signZones() throws IOException {
+    if (isDNSSECEnabled()) {
+      Collection<Zone> zoneCollection = zones.values();
+      for (Zone zone : zoneCollection) {
+        Iterator itor = zone.iterator();
+        while (itor.hasNext()) {
+          RRset rRset = (RRset) itor.next();
+          Iterator sigs = rRset.sigs();
+          if (!sigs.hasNext()) {
+            try {
+              signSiteRecord(zone, rRset.first());
+            } catch (DNSSEC.DNSSECException e) {
+              throw new IOException(e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Initializes a zone by reading any zone file by the same name in the
+   * designated zone file directory.
+   *
+   * @param conf the Hadoop configuration object.
+   * @throws IOException
+   */
+  private void initializeZonesFromFiles(Configuration conf) throws IOException {
+    // should this be in HDFS?
+    String zonesDir = conf.get(KEY_DNS_ZONES_DIR);
+    if (zonesDir != null) {
+      Iterator<File> iterator = FileUtils.iterateFiles(new File(zonesDir),
+          new IOFileFilter() {
+            @Override
+            public boolean accept(
+                File file) {
+              return file.getName().endsWith(
+                  ZONE_SUFFIX);
+            }
+
+            @Override
+            public boolean accept(
+                File file,
+                String s) {
+              return s.endsWith(
+                  ZONE_SUFFIX);
+            }
+          }, null);
+      while (iterator.hasNext()) {
+        File file = iterator.next();
+        String name = file.getName();
+        name = name.substring(0, name.indexOf(ZONE_SUFFIX) + 1);
+        Zone zone = new SecureableZone(Name.fromString(name),
+            file.getAbsolutePath());
+        zones.putIfAbsent(zone.getOrigin(), zone);
+      }
+    }
+  }
+
+  /**
+   * Initializes the reverse lookup zone (mapping IP to name).
+   *
+   * @param conf the Hadoop configuration.
+   * @throws IOException
+   */
+  private void initializeReverseLookupZone(Configuration conf)
+      throws IOException {
+    Name reverseLookupZoneName = getReverseZoneName(conf);
+    Zone reverseLookupZone =
+        configureZone(reverseLookupZoneName, conf);
+    zones.put(reverseLookupZone.getOrigin(), reverseLookupZone);
+  }
+
+  /**
+   * Returns the list of reverse lookup zones.
+   *
+   * @param conf the hadoop configuration.
+   * @return the list of reverse zone names required based on the configuration
+   * properties.
+   */
+  protected Name getReverseZoneName(Configuration conf) {
+    Name name = null;
+    String zoneSubnet = getZoneSubnet(conf);
+    if (zoneSubnet == null) {
+      LOG.warn("Zone subnet is not configured.  Reverse lookups disabled");
+    } else {
+      // is there a netmask
+      String mask = conf.get(KEY_DNS_ZONE_MASK);
+      if (mask != null) {
+        // get the range of IPs
+        SubnetUtils utils = new SubnetUtils(zoneSubnet, mask);
+        name = getReverseZoneName(utils, zoneSubnet);
+      } else {
+        name = getReverseZoneName(zoneSubnet);
+      }
+    }
+    return name;
+  }
+
+  /**
+   * Return the subnet for the zone.  this should be a network address for the
+   * subnet (ends in ".0").
+   *
+   * @param conf the hadoop configuration.
+   * @return the zone subnet.
+   */
+  private String getZoneSubnet(Configuration conf) {
+    String subnet = conf.get(KEY_DNS_ZONE_SUBNET);
+    if (subnet != null) {
+      final String[] bytes = subnet.split("\\.");
+      if (bytes.length == 3) {
+        subnet += ".0";
+      }
+    }
+    return subnet;
+  }
+
+  /**
+   * Return the reverse zone name based on the address.
+   *
+   * @param networkAddress the network address.
+   * @return the reverse zone name.
+   */
+  private Name getReverseZoneName(String networkAddress) {
+    return getReverseZoneName(null, networkAddress);
+  }
+
+  /**
+   * Return the reverse zone name based on the address.
+   *
+   * @param utils          subnet utils
+   * @param networkAddress the network address.
+   * @return the reverse zone name.
+   */
+  private Name getReverseZoneName(SubnetUtils utils, String networkAddress) {
+    Name reverseZoneName = null;
+    boolean isLargeNetwork = false;
+    if (utils != null) {
+      isLargeNetwork = utils.getInfo().getAddressCount() > 256;
+    }
+    final String[] bytes = networkAddress.split("\\.");
+    if (bytes.length == 4) {
+      String reverseLookupZoneName = null;
+      if (isLargeNetwork) {
+        reverseLookupZoneName =
+            String.format("%s.%s.%s",
+                bytes[1],
+                bytes[0],
+                IN_ADDR_ARPA);
+      } else {
+        reverseLookupZoneName =
+            String.format("%s.%s.%s.%s",
+                bytes[2],
+                bytes[1],
+                bytes[0],
+                IN_ADDR_ARPA);
+      }
+      try {
+        reverseZoneName = Name.fromString(reverseLookupZoneName);
+      } catch (TextParseException e) {
+        LOG.warn("Unable to convert {} to DNS name", reverseLookupZoneName);
+      }
+    }
+    return reverseZoneName;
+  }
+
+  /**
+   * Create the zone and its related zone associated DNS records  (NS, SOA).
+   *
+   * @param zoneName domain name of the zone
+   * @param conf     configuration reference.
+   * @return the zone.
+   * @throws IOException
+   */
+  private Zone configureZone(Name zoneName, Configuration conf)
+      throws IOException {
+    bindHost = Name.fromString(
+        InetAddress.getLocalHost().getCanonicalHostName() + ".");
+    SOARecord soaRecord = new SOARecord(zoneName, DClass.IN, ttl,
+        bindHost,
+        bindHost, getSerial(), 86000, 7200,
+        1209600, 600);
+    NSRecord nsRecord = new NSRecord(zoneName, DClass.IN, ttl, bindHost);
+    Zone zone = zones.get(zoneName);
+    if (zone == null) {
+      zone = new SecureableZone(zoneName, new Record[] {soaRecord, nsRecord});
+    }
+
+    try {
+      enableDNSSECIfNecessary(zone, conf, soaRecord, nsRecord);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    } catch (InvalidKeySpecException e) {
+      throw new IOException(e);
+    } catch (DNSSEC.DNSSECException e) {
+      throw new IOException(e);
+    }
+
+    return zone;
+  }
+
+  /**
+   * Return a serial number based on the current date and time.
+   *
+   * @return the serial number.
+   */
+  private long getSerial() {
+    Date curDate = new Date();
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHH");
+    String serial = simpleDateFormat.format(curDate);
+    return Long.parseLong(serial);
+  }
+
+  /**
+   * Set the value of the DNSSEC enabled property.
+   *
+   * @param conf the Hadoop configuration.
+   */
+  private void setDNSSECEnabled(Configuration conf) {
+    dnssecEnabled = conf.getBoolean(KEY_DNSSEC_ENABLED, false);
+  }
+
+  /**
+   * Is DNSSEC enabled?
+   *
+   * @return true if enabled, false otherwise.
+   */
+  private boolean isDNSSECEnabled() {
+    return dnssecEnabled;
+  }
+
+  /**
+   * Load the required public/private keys, create the zone DNSKEY record, and
+   * sign the zone level records.
+   *
+   * @param zone      the zone.
+   * @param conf      the configuration.
+   * @param soaRecord the SOA record.
+   * @param nsRecord  the NS record.
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   * @throws InvalidKeySpecException
+   * @throws DNSSEC.DNSSECException
+   */
+  private void enableDNSSECIfNecessary(Zone zone, Configuration conf,
+      SOARecord soaRecord,
+      NSRecord nsRecord)
+      throws IOException, NoSuchAlgorithmException, InvalidKeySpecException,
+      DNSSEC.DNSSECException {
+    if (isDNSSECEnabled()) {
+      // read in the DNSKEY and create the DNSKEYRecord
+      // TODO:  reading these out of config seems wrong...
+      String publicKey = conf.get(KEY_DNSSEC_PUBLIC_KEY);
+      if (publicKey == null) {
+        throw new IOException("DNSSEC Key not configured");
+      }
+      //TODO - perhaps read in actual DNSKEY record structure?
+      Name zoneName = zone.getOrigin();
+      DNSKEYRecord dnskeyRecord = dnsKeyRecs.get(zoneName);
+      if (dnskeyRecord == null) {
+        byte[] key = Base64.decodeBase64(publicKey.getBytes("UTF-8"));
+        dnskeyRecord = new DNSKEYRecord(zoneName,
+            DClass.IN, ttl,
+            DNSKEYRecord.Flags.ZONE_KEY,
+            DNSKEYRecord.Protocol.DNSSEC,
+            DNSSEC.Algorithm.RSASHA256, key);
+        dnsKeyRecs.putIfAbsent(zoneName, dnskeyRecord);
+      }
+      LOG.info("Registering {}", dnskeyRecord);
+      try (CloseableLock lock = writeLock.lock()) {
+        zone.addRecord(dnskeyRecord);
+
+        String privateKeyFile = conf.get(KEY_DNSSEC_PRIVATE_KEY_FILE,
+            DEFAULT_DNSSEC_PRIVATE_KEY_FILE);
+
+        Properties props = new Properties();
+        try (
+            FileInputStream inputStream = new FileInputStream(privateKeyFile)) {
+          props.load(inputStream);
+        }
+
+        String privateModulus = props.getProperty("Modulus");
+        String privateExponent = props.getProperty("PrivateExponent");
+
+        RSAPrivateKeySpec privateSpec = new RSAPrivateKeySpec(
+            new BigInteger(1, Base64.decodeBase64(privateModulus)),
+            new BigInteger(1, Base64.decodeBase64(privateExponent)));
+
+        KeyFactory factory = KeyFactory.getInstance("RSA");
+        privateKey = factory.generatePrivate(privateSpec);
+
+        signSiteRecord(zone, dnskeyRecord);
+        signSiteRecord(zone, soaRecord);
+        signSiteRecord(zone, nsRecord);
+      }
+      // create required DS records
+
+      // domain
+//      DSRecord dsRecord = new DSRecord(zoneName, DClass.IN, ttl,
+//                                       DSRecord.Digest.SHA1, dnskeyRecord);
+//      zone.addRecord(dsRecord);
+//      signSiteRecord(zone, dsRecord);
+    }
+  }
+
+  /**
+   * Sign a DNS record.
+   *
+   * @param zone   the zone reference
+   * @param record the record to sign.
+   * @throws DNSSEC.DNSSECException
+   */
+  private void signSiteRecord(Zone zone, Record record)
+      throws DNSSEC.DNSSECException {
+    RRset rrset = zone.findExactMatch(record.getName(),
+        record.getType());
+    Calendar cal = Calendar.getInstance();
+    Date inception = cal.getTime();
+    cal.add(Calendar.YEAR, 1);
+    Date expiration = cal.getTime();
+    RRSIGRecord rrsigRecord =
+        DNSSEC.sign(rrset, dnsKeyRecs.get(zone.getOrigin()),
+            privateKey, inception, expiration);
+    LOG.info("Adding {}", record);
+    rrset.addRR(rrsigRecord);
+  }
+
+  /**
+   * Sets the zone/domain name.  The name will be read from the configuration
+   * and the code will ensure the name is absolute.
+   *
+   * @param conf the configuration.
+   * @throws IOException
+   */
+  void setDomainName(Configuration conf) throws IOException {
+    domainName = conf.get(KEY_DNS_DOMAIN);
+    if (domainName == null) {
+      throw new IOException("No DNS domain name specified");
+    }
+    if (!domainName.endsWith(".")) {
+      domainName += ".";
+    }
+  }
+
+  /**
+   * Stops the registry.
+   *
+   * @throws Exception if the service stop generates an issue.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    stopExecutor();
+    super.serviceStop();
+  }
+
+  /**
+   * Shuts down the leveraged executor service.
+   */
+  protected synchronized void stopExecutor() {
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+  }
+
+  /**
+   * Creates a DNS error response.
+   *
+   * @param in the byte array detailing the error.
+   * @return the error message, in bytes
+   */
+  public byte[] formErrorMessage(byte[] in) {
+    Header header;
+    try {
+      header = new Header(in);
+    } catch (IOException e) {
+      return null;
+    }
+    return buildErrorMessage(header, Rcode.FORMERR, null);
+  }
+
+  /**
+   * Process a TCP request.
+   *
+   * @param ch the socket channel for the request.
+   * @throws IOException if the tcp processing generates an issue.
+   */
+  public void nioTCPClient(SocketChannel ch) throws IOException {
+    try {
+      // query sizes are small, so the following two lines should work
+      // in all instances
+      ByteBuffer buf = ByteBuffer.allocate(1024);
+      ch.read(buf);
+      buf.flip();
+      int messageLength = getMessgeLength(buf);
+
+      byte[] in = new byte[messageLength];
+
+      buf.get(in, 0, messageLength);
+
+      Message query;
+      byte[] response = null;
+      try {
+        query = new Message(in);
+        response = generateReply(query, ch.socket());
+        if (response == null) {
+          return;
+        }
+      } catch (IOException e) {
+        response = formErrorMessage(in);
+      }
+
+      ByteBuffer out = ByteBuffer.allocate(response.length + 2);
+      out.putShort(0, (short) (response.length & 0xffff));
+      out.put(response);
+
+      ch.write(out);
+    } catch (IOException e) {
+      throw NetUtils.wrapException(ch.socket().getInetAddress().getHostName(),
+          ch.socket().getPort(),
+          ch.socket().getLocalAddress().getHostName(),
+          ch.socket().getLocalPort(), e);
+    } finally {
+      IOUtils.closeStream(ch);
+    }
+
+  }
+
+  /**
+   * Calculate the inbound message length, which is related in the message as an
+   * unsigned short value.
+   *
+   * @param buf the byte buffer containing the message.
+   * @return the message length
+   * @throws EOFException
+   */
+  private int getMessgeLength(ByteBuffer buf) throws EOFException {
+    int ch1 = buf.get();
+    int ch2 = buf.get();
+    if ((ch1 | ch2) < 0) {
+      throw new EOFException();
+    }
+    return (ch1 << 8) + (ch2 & 0xff);
+  }
+
+  /**
+   * Monitor the TCP socket for inbound requests.
+   *
+   * @param serverSocketChannel the server socket channel
+   * @param addr                the local inet address
+   * @param port                the listener (local) port
+   * @throws Exception if the tcp processing fails.
+   */
+  public void serveNIOTCP(ServerSocketChannel serverSocketChannel,
+      InetAddress addr, int port) throws Exception {
+    try {
+
+      while (true) {
+        final SocketChannel socketChannel = serverSocketChannel.accept();
+        if (socketChannel != null) {
+          executor.submit(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+              nioTCPClient(socketChannel);
+              return true;
+            }
+          });
+
+        }
+      }
+    } catch (IOException e) {
+      throw NetUtils.wrapException(addr.getHostName(), port,
+          addr.getHostName(), port, e);
+    }
+  }
+
+  /**
+   * Open the TCP listener.
+   *
+   * @param addr the host address.
+   * @param port the host port.
+   * @return the created server socket channel.
+   * @throws IOException
+   */
+  private ServerSocketChannel openTCPChannel(InetAddress addr, int port)
+      throws IOException {
+    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+    try {
+      serverSocketChannel.socket().bind(new InetSocketAddress(addr, port));
+      serverSocketChannel.configureBlocking(false);
+    } catch (IOException e) {
+      throw NetUtils.wrapException(null, 0,
+          InetAddress.getLocalHost().getHostName(),
+          port, e);
+    }
+    return serverSocketChannel;
+  }
+
+  /**
+   * Create the thread (Callable) monitoring the TCP listener.
+   *
+   * @param addr host address.
+   * @param port host port.
+   * @throws Exception if the tcp listener generates an error.
+   */
+  public void addNIOTCP(final InetAddress addr, final int port)
+      throws Exception {
+    final ServerSocketChannel tcpChannel = openTCPChannel(addr, port);
+    executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        try {
+          serveNIOTCP(tcpChannel, addr, port);
+        } catch (Exception e) {
+          LOG.error("Error initializing DNS TCP listener", e);
+          throw e;
+        }
+
+        return true;
+      }
+
+    });
+  }
+
+  /**
+   * Create the thread monitoring the socket for inbound UDP requests.
+   *
+   * @param addr host address.
+   * @param port host port.
+   * @throws Exception if the UDP listener creation generates an error.
+   */
+  public void addNIOUDP(final InetAddress addr, final int port)
+      throws Exception {
+    final DatagramChannel udpChannel = openUDPChannel(addr, port);
+    executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        try {
+          serveNIOUDP(udpChannel, addr, port);
+        } catch (Exception e) {
+          LOG.error("Error initializing DNS UDP listener", e);
+          throw e;
+        }
+        return true;
+      }
+    });
+  }
+
+  /**
+   * Process an inbound UDP request.
+   *
+   * @param channel the UDP datagram channel.
+   * @param addr    local host address.
+   * @param port    local port.
+   * @throws IOException if the UDP processing fails.
+   */
+  private void serveNIOUDP(DatagramChannel channel,
+      InetAddress addr, int port) throws Exception {
+    SocketAddress remoteAddress = null;
+    try {
+
+      ByteBuffer input = ByteBuffer.allocate(4096);
+      ByteBuffer output = ByteBuffer.allocate(4096);
+      byte[] in = null;
+
+      while (true) {
+        input.clear();
+        try {
+          remoteAddress = channel.receive(input);
+        } catch (IOException e) {
+          LOG.debug("Error during message receipt", e);
+          continue;
+        }
+        Message query;
+        byte[] response = null;
+        try {
+          int position = input.position();
+          in = new byte[position];
+          input.flip();
+          input.get(in);
+          query = new Message(in);
+          LOG.info("{}:  received query {}", remoteAddress,
+              query.getQuestion());
+          response = generateReply(query, null);
+          if (response == null) {
+            continue;
+          }
+        } catch (IOException e) {
+          response = formErrorMessage(in);
+        }
+        output.clear();
+        output.put(response);
+        output.flip();
+
+        LOG.info("{}:  sending response", remoteAddress);
+        channel.send(output, remoteAddress);
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException && remoteAddress != null) {
+        throw NetUtils.wrapException(addr.getHostName(),
+            port,
+            ((InetSocketAddress) remoteAddress).getHostName(),
+            ((InetSocketAddress) remoteAddress).getPort(),
+            (IOException) e);
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Create and UDP listener socket.
+   *
+   * @param addr host address.
+   * @param port host port.
+   * @return
+   * @throws IOException if listener creation fails.
+   */
+  private DatagramChannel openUDPChannel(InetAddress addr, int port)
+      throws IOException {
+    DatagramChannel channel = DatagramChannel.open();
+    try {
+      channel.socket().bind(new InetSocketAddress(addr, port));
+    } catch (IOException e) {
+      throw NetUtils.wrapException(null, 0,
+          InetAddress.getLocalHost().getHostName(),
+          port, e);
+    }
+    return channel;
+  }
+
+  /**
+   * Create an error message.
+   *
+   * @param header   the response header.
+   * @param rcode    the response code.
+   * @param question the question record.
+   * @return  the error message.
+   */
+  byte[] buildErrorMessage(Header header, int rcode, Record question) {
+    Message response = new Message();
+    response.setHeader(header);
+    for (int i = 0; i < 4; i++) {
+      response.removeAllRecords(i);
+    }
+    if (rcode == Rcode.SERVFAIL) {
+      response.addRecord(question, Section.QUESTION);
+    }
+    header.setRcode(rcode);
+    return response.toWire();
+  }
+
+  /**
+   * Generate an error message based on inbound query.
+   *
+   * @param query the query.
+   * @param rcode the response code for the specific error.
+   * @return the error message.
+   */
+  public byte[] errorMessage(Message query, int rcode) {
+    return buildErrorMessage(query.getHeader(), rcode,
+        query.getQuestion());
+  }
+
+  /**
+   * Generate the response for the inbound DNS query.
+   *
+   * @param query the query.
+   * @param s     the socket associated with the query.
+   * @return the response, in bytes.
+   * @throws IOException if reply generation fails.
+   */
+  byte[] generateReply(Message query, Socket s)
+      throws IOException {
+    Header header;
+    boolean badversion;
+    int maxLength;
+    int flags = 0;
+
+    OPTRecord queryOPT = query.getOPT();
+    maxLength = getMaxLength(s, queryOPT);
+
+    header = query.getHeader();
+    if (header.getFlag(Flags.QR)) {
+      LOG.debug("returning null");
+      return null;
+    }
+    if (header.getRcode() != Rcode.NOERROR) {
+      return errorMessage(query, Rcode.FORMERR);
+    }
+    if (header.getOpcode() != Opcode.QUERY) {
+      return errorMessage(query, Rcode.NOTIMP);
+    }
+
+    Record queryRecord = query.getQuestion();
+
+    if (queryOPT != null && (queryOPT.getFlags() & ExtendedFlags.DO) != 0) {
+      flags = FLAG_DNSSECOK;
+    }
+
+    Message response = new Message(query.getHeader().getID());
+    response.getHeader().setFlag(Flags.QR);
+    if (query.getHeader().getFlag(Flags.RD)) {
+      response.getHeader().setFlag(Flags.RD);
+    }
+    response.addRecord(queryRecord, Section.QUESTION);
+
+    Name name = queryRecord.getName();
+    int type = queryRecord.getType();
+    int dclass = queryRecord.getDClass();
+
+    TSIGRecord queryTSIG = query.getTSIG();
+    if (type == Type.AXFR && s != null) {
+      return doAXFR(name, query, null, queryTSIG, s);
+    }
+    if (!Type.isRR(type) && type != Type.ANY) {
+      return errorMessage(query, Rcode.NOTIMP);
+    }
+
+    LOG.debug("calling addAnswer");
+    byte rcode = addAnswer(response, name, type, dclass, 0, flags);
+    if (rcode != Rcode.NOERROR && rcode != Rcode.NXDOMAIN) {
+      return errorMessage(query, rcode);
+    }
+
+    addAdditional(response, flags);
+
+    if (queryOPT != null) {
+      int optflags = (flags == FLAG_DNSSECOK) ? ExtendedFlags.DO : 0;
+      OPTRecord opt = new OPTRecord((short) 4096, rcode >>> 16, (byte) 0,
+          optflags);
+      response.addRecord(opt, Section.ADDITIONAL);
+    }
+
+    return response.toWire(maxLength);
+  }
+
+  /**
+   * Create a query to forward to the primary DNS server (if configured).
+   * NOTE:  Experimental
+   *
+   * @param query the inbound query.
+   * @return the query to forward to the primary server.
+   * @throws NameTooLongException
+   * @throws TextParseException if query creation fails.
+   */
+  private Message createPrimaryQuery(Message query)
+      throws NameTooLongException, TextParseException {
+    Name name = query.getQuestion().getName();
+    if (name.labels() > 0 && name.labels() <= 2) {
+      // short relative or absolute name.  this code may not be necessary -
+      // OS resolution utilities probably append the search paths defined
+      // in resolv.conf prior to the lookup
+      int id = query.getHeader().getID();
+      String queryName = name.getLabelString(0);
+      Name qualifiedName = Name.concatenate(Name.fromString(queryName),
+          Name.fromString(domainName));
+      LOG.info("Received query {}.  Forwarding query {}", name, qualifiedName);
+      Record question = Record.newRecord(qualifiedName,
+          query.getQuestion().getType(),
+          query.getQuestion().getDClass());
+      query = Message.newQuery(question);
+      query.getHeader().setID(id);
+    }
+    return query;
+  }
+
+  /**
+   * Calculate the max length for a response.
+   *
+   * @param s        the request socket.
+   * @param queryOPT describes Extended DNS (EDNS) properties of a Message.
+   * @return  the length of the response.
+   */
+  private int getMaxLength(Socket s, OPTRecord queryOPT) {
+    int maxLength;
+    if (s != null) {
+      maxLength = 65535;
+    } else if (queryOPT != null) {
+      maxLength = Math.max(queryOPT.getPayloadSize(), 512);
+    } else {
+      maxLength = 512;
+    }
+    return maxLength;
+  }
+
+  /**
+   * Add additional information to a DNS response section if a glue name is
+   * specified.
+   *
+   * @param response the response message.
+   * @param section  the section of the response (e.g. ANSWER, AUTHORITY)
+   * @param flags the flags.
+   */
+  private void addAdditional2(Message response, int section, int flags) {
+    Record[] records = response.getSectionArray(section);
+    for (int i = 0; i < records.length; i++) {
+      Record r = records[i];
+      Name glueName = r.getAdditionalName();
+      if (glueName != null) {
+        addGlue(response, glueName, flags);
+      }
+    }
+  }
+
+  /**
+   * Process any additional records indicated for both the ANSWER and AUTHORITY
+   * sections of the response.
+   *
+   * @param response the response message.
+   * @param flags the flags.
+   */
+  private void addAdditional(Message response, int flags) {
+    addAdditional2(response, Section.ANSWER, flags);
+    addAdditional2(response, Section.AUTHORITY, flags);
+  }
+
+  /**
+   * Add the specific record indicated by the "glue", or the mapping to a
+   * specific host.
+   *
+   * @param response the response message.
+   * @param name     the name of the glue record.
+   * @param flags    the flags.
+   */
+  private void addGlue(Message response, Name name, int flags) {
+    RRset a = findExactMatch(name, Type.A);
+    if (a == null) {
+      return;
+    }
+    addRRset(name, response, a, Section.ADDITIONAL, flags);
+  }
+
+  /**
+   * Find the record set that matches the requested name and type.
+   *
+   * @param name the requested name.
+   * @param type the record type.
+   * @return the set of records with the given name and type.
+   */
+  public RRset findExactMatch(Name name, int type) {
+    try (CloseableLock lock = readLock.lock()) {
+      Zone zone = findBestZone(name);
+      if (zone != null) {
+        return zone.findExactMatch(name, type);
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Find the zone that correlates to the provided name.
+   *
+   * @param name the name to be matched to a zone.
+   * @return the zone.
+   */
+  @Override public Zone findBestZone(Name name) {
+    Zone foundzone = null;
+    foundzone = zones.get(name);
+    if (foundzone != null) {
+      return foundzone;
+    }
+    int labels = name.labels();
+    for (int i = 1; i < labels; i++) {
+      Name tname = new Name(name, i);
+      foundzone = zones.get(tname);
+      if (foundzone != null) {
+        return foundzone;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Add the answer section to the response.
+   *
+   * @param response   the response message.
+   * @param name       the name of the answer record.
+   * @param type       the type of record.
+   * @param dclass     the DNS class.
+   * @param iterations iteration count.
+   * @param flags
+   * @return the response code.
+   */
+  byte addAnswer(Message response, Name name, int type, int dclass,
+      int iterations, int flags) {
+    SetResponse sr = null;
+    byte rcode = Rcode.NOERROR;
+
+    if (iterations > 6) {
+      return Rcode.NOERROR;
+    }
+
+    if (type == Type.SIG || type == Type.RRSIG) {
+      type = Type.ANY;
+      flags |= FLAG_SIGONLY;
+    }
+
+    Zone zone = findBestZone(name);
+
+    LOG.debug("finding record");
+    try (CloseableLock lock = readLock.lock()) {
+      if (zone != null) {
+        sr = zone.findRecords(name, type);
+      } else {
+        rcode = Rcode.NOTAUTH;
+      }
+    }
+    LOG.info("found record? {}", sr != null && sr.isSuccessful());
+
+    if (sr != null) {
+      if (sr.isCNAME()) {
+        CNAMERecord cname = sr.getCNAME();
+        RRset rrset = zone.findExactMatch(cname.getName(), Type.CNAME);
+        addRRset(name, response, rrset, Section.ANSWER, flags);
+        if (iterations == 0) {
+          response.getHeader().setFlag(Flags.AA);
+        }
+        rcode = addAnswer(response, cname.getTarget(),
+            type, dclass, iterations + 1, flags);
+      }
+      if (sr.isNXDOMAIN()) {
+        response.getHeader().setRcode(Rcode.NXDOMAIN);
+        if (isDNSSECEnabled()) {
+          try {
+            addNXT(response, flags);
+          } catch (Exception e) {
+            LOG.warn("Unable to add NXTRecord to AUTHORITY Section", e);
+          }
+        }
+        addSOA(response, zone, flags);
+        if (iterations == 0) {
+          response.getHeader().setFlag(Flags.AA);
+        }
+        rcode = Rcode.NXDOMAIN;
+      } else if (sr.isNXRRSET()) {
+        LOG.info("No data found the given name {} and type {}", name, type);
+        addSOA(response, zone, flags);
+        if (iterations == 0) {
+          response.getHeader().setFlag(Flags.AA);
+        }
+      } else if (sr.isSuccessful()) {
+        RRset[] rrsets = sr.answers();
+        LOG.info("found answers {}", rrsets);
+        for (int i = 0; i < rrsets.length; i++) {
+          addRRset(name, response, rrsets[i],
+              Section.ANSWER, flags);
+        }
+        addNS(response, zone, flags);
+        if (iterations == 0) {
+          response.getHeader().setFlag(Flags.AA);
+        }
+      }
+    } else {
+      if (zone != null) {
+        Name defaultDomain = null;
+        try {
+          defaultDomain = Name.fromString(domainName);
+          zone = zones.get(defaultDomain);
+          addNS(response, zone, flags);
+          if (iterations == 0) {
+            response.getHeader().setFlag(Flags.AA);
+          }
+        } catch (TextParseException e) {
+          LOG.warn("Unable to obtain default zone for unknown name response",
+              e);
+        }
+      }
+    }
+    return rcode;
+  }
+
+  /**
+   * Add the SOA record (describes the properties of the zone) to the authority
+   * section of the response.
+   *
+   * @param response the response message.
+   * @param zone     the DNS zone.
+   */
+  private void addSOA(Message response, Zone zone, int flags) {
+    RRset soa = zone.findExactMatch(zone.getOrigin(), Type.SOA);
+    addRRset(soa.getName(), response, soa,
+        Section.AUTHORITY, flags);
+  }
+
+  /**
+   * Add the NXT record to the authority
+   * section of the response.
+   *
+   * @param response the response message.
+   */
+  private void addNXT(Message response, int flags)
+      throws DNSSEC.DNSSECException, IOException {
+    Record nxtRecord = getNXTRecord(
+        response.getSectionArray(Section.QUESTION)[0]);
+    Zone zone = findBestZone(nxtRecord.getName());
+    addRecordCommand.exec(zone, nxtRecord);
+    RRset nxtRR = zone.findExactMatch(nxtRecord.getName(), Type.NXT);
+    addRRset(nxtRecord.getName(), response, nxtRR, Section.AUTHORITY, flags);
+
+    removeRecordCommand.exec(zone, nxtRecord);
+  }
+
+  /**
+   * Return an NXT record required to validate negative responses.  If there is
+   * an issue returning the NXT record, a SOA record will be returned.
+   *
+   * @param query the query record.
+   * @return an NXT record.
+   */
+  private Record getNXTRecord(Record query) {
+    Record response = null;
+    SecureableZone zone = (SecureableZone) findBestZone(query.getName());
+    if (zone != null) {
+      response = zone.getNXTRecord(query, zone);
+      if (response == null) {
+        response = zone.getSOA();
+      }
+    }
+
+    return response;
+  }
+
+  /**
+   * Add the name server info to the authority section.
+   *
+   * @param response the response message.
+   * @param zone     the DNS zone.
+   * @param flags    the flags.
+   */
+  private void addNS(Message response, Zone zone, int flags) {
+    RRset nsRecords = zone.getNS();
+    addRRset(nsRecords.getName(), response, nsRecords,
+        Section.AUTHORITY, flags);
+  }
+
+  /**
+   * Add the provided record set to the response section specified.
+   *
+   * @param name     the name associated with the record set.
+   * @param response the response message.
+   * @param rrset    the record set.
+   * @param section  the response section to which the record set will be added.
+   * @param flags    the flags.
+   */
+  private void addRRset(Name name, Message response, RRset rrset, int section,
+      int flags) {
+    for (int s = 1; s <= section; s++) {
+      if (response.findRRset(name, rrset.getType(), s)) {
+        return;
+      }
+    }
+    if ((flags & FLAG_SIGONLY) == 0) {
+      Iterator it = rrset.rrs();
+      while (it.hasNext()) {
+        Record r = (Record) it.next();
+        if (r.getName().isWild() && !name.isWild()) {
+          r = r.withName(name);
+        }
+        response.addRecord(r, section);
+      }
+    }
+    if ((flags & (FLAG_SIGONLY | FLAG_DNSSECOK)) != 0) {
+      Iterator it = rrset.sigs();
+      while (it.hasNext()) {
+        Record r = (Record) it.next();
+        if (r.getName().isWild() && !name.isWild()) {
+          r = r.withName(name);
+        }
+        response.addRecord(r, section);
+      }
+    }
+  }
+
+  /**
+   * Perform a zone transfer.
+   *
+   * @param name  the zone name.
+   * @param query the query.
+   * @param tsig  the query signature.
+   * @param qtsig the signature record.
+   * @param s     the connection socket.
+   * @return      an error message if there is no matching zone
+   * or null due to error.
+   */
+  byte[] doAXFR(Name name, Message query, TSIG tsig, TSIGRecord qtsig,
+      Socket s) {
+    boolean first = true;
+    Zone zone = findBestZone(name);
+    if (zone == null) {
+      return errorMessage(query, Rcode.REFUSED);
+    }
+    Iterator it = zone.AXFR();
+    try {
+      DataOutputStream dataOut;
+      dataOut = new DataOutputStream(s.getOutputStream());
+      int id = query.getHeader().getID();
+      while (it.hasNext()) {
+        RRset rrset = (RRset) it.next();
+        Message response = new Message(id);
+        Header header = response.getHeader();
+        header.setFlag(Flags.QR);
+        header.setFlag(Flags.AA);
+        addRRset(rrset.getName(), response, rrset,
+            Section.ANSWER, FLAG_DNSSECOK);
+        if (tsig != null) {
+          tsig.applyStream(response, qtsig, first);
+          qtsig = response.getTSIG();
+        }
+        first = false;
+        byte[] out = response.toWire();
+        dataOut.writeShort(out.length);
+        dataOut.write(out);
+      }
+    } catch (IOException ex) {
+      System.out.println("AXFR failed");
+    }
+    try {
+      s.close();
+    } catch (IOException ex) {
+    }
+    return null;
+  }
+
+  /**
+   * Perform the registry operation (register or delete).  This method will take
+   * the provided service record and either add or remove the DNS records
+   * indicated.
+   *
+   * @param path    the ZK path for the service record.
+   * @param record  the service record.
+   * @param command the registry command (REGISTER or DELETE).
+   * @throws IOException if the is an error performing registry operation.
+   */
+  private void op(String path, ServiceRecord record, RegistryCommand command)
+      throws IOException {
+    ServiceRecordProcessor processor;
+    try {
+      if (record.get(YarnRegistryAttributes.YARN_PERSISTENCE)
+          .equals(CONTAINER)) {
+        // container registration.  the logic to identify and create the
+        // container entry needs to be enhanced/more accurate and associate to
+        // correct host
+        processor =
+            new ContainerServiceRecordProcessor(record, path, domainName, this);
+      } else {
+        processor =
+            new ApplicationServiceRecordProcessor(record, path, domainName,
+                this);
+      }
+      processor.manageDNSRecords(command);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+  }
+
+  /**
+   * Return the username found in the ZK path.
+   *
+   * @param path the ZK path.
+   * @return the user name.
+   */
+  private String getUsername(String path) {
+    String user = "anonymous";
+    Matcher matcher = USER_NAME.matcher(path);
+    if (matcher.find()) {
+      user = matcher.group(1);
+    }
+    return user;
+  }
+
+  /**
+   * Register DNS records based on the provided service record.
+   *
+   * @param path   the ZK path of the service record.
+   * @param record record providing DNS registration info.
+   * @throws IOException if registration causes an error.
+   */
+  @Override
+  public void register(String path, ServiceRecord record) throws IOException {
+    op(path, record, addRecordCommand);
+  }
+
+  /**
+   * Delete the DNS records generated by the provided service record.
+   *
+   * @param path   the ZK path for the given record.
+   * @param record the service record
+   * @throws IOException if deletion causes and error.
+   */
+  @Override
+  public void delete(String path, ServiceRecord record) throws IOException {
+    op(path, record, removeRecordCommand);
+  }
+
+  /**
+   * An interface representing a registry associated function/command (see
+   * command pattern).
+   */
+  interface RegistryCommand {
+    void exec(Zone zone, Record record) throws IOException;
+
+    String getLogDescription();
+  }
+
+  /**
+   * The "add record" command.
+   */
+  private final RegistryCommand addRecordCommand = new RegistryCommand() {
+    @Override
+    public void exec(Zone zone, Record record) throws IOException {
+      if (zone != null) {
+        try (CloseableLock lock = writeLock.lock()) {
+          zone.addRecord(record);
+          LOG.info("Registered {}", record);
+          if (isDNSSECEnabled()) {
+            Calendar cal = Calendar.getInstance();
+            Date inception = cal.getTime();
+            cal.add(Calendar.YEAR, 1);
+            Date expiration = cal.getTime();
+            RRset rRset =
+                zone.findExactMatch(record.getName(), record.getType());
+            try {
+              DNSKEYRecord dnskeyRecord = dnsKeyRecs.get(zone.getOrigin());
+              RRSIGRecord rrsigRecord =
+                  DNSSEC.sign(rRset, dnskeyRecord, privateKey,
+                      inception, expiration);
+              LOG.info("Adding {}", rrsigRecord);
+              rRset.addRR(rrsigRecord);
+
+              //addDSRecord(zone, record.getName(), record.getDClass(),
+              //  record.getTTL(), inception, expiration);
+
+            } catch (DNSSEC.DNSSECException e) {
+              throw new IOException(e);
+            }
+          }
+        }
+      } else {
+        LOG.warn("Unable to find zone matching record {}", record);
+      }
+    }
+
+    /**
+     * Add a DS record associated with the input name.
+     * @param zone  the zone.
+     * @param name  the record name.
+     * @param dClass the DNS class.
+     * @param dsTtl the ttl value.
+     * @param inception  the time of inception of the record.
+     * @param expiration  the expiry time of the record.
+     * @throws DNSSEC.DNSSECException if the addition of DS record fails.
+     */
+    private void addDSRecord(Zone zone,
+        Name name, int dClass, long dsTtl,
+        Date inception,
+        Date expiration) throws DNSSEC.DNSSECException {
+      RRset rRset;
+      RRSIGRecord rrsigRecord;
+
+      DNSKEYRecord dnskeyRecord = dnsKeyRecs.get(zone.getOrigin());
+      DSRecord dsRecord = new DSRecord(name, dClass,
+          dsTtl, DSRecord.Digest.SHA1,
+          dnskeyRecord);
+      zone.addRecord(dsRecord);
+      LOG.info("Adding {}", dsRecord);
+      rRset = zone.findExactMatch(dsRecord.getName(), dsRecord.getType());
+
+      rrsigRecord = DNSSEC.sign(rRset, dnskeyRecord, privateKey,
+          inception, expiration);
+      rRset.addRR(rrsigRecord);
+    }
+
+    @Override
+    public String getLogDescription() {
+      return "Registering ";
+    }
+  };
+
+  /**
+   * The "remove record" command.
+   */
+  private final RegistryCommand removeRecordCommand = new RegistryCommand() {
+    @Override
+    public void exec(Zone zone, Record record) throws IOException {
+      zone.removeRecord(record);
+      LOG.info("Removed {}", record);
+      if (isDNSSECEnabled()) {
+        RRset rRset = zone.findExactMatch(record.getName(), Type.DS);
+        if (rRset != null) {
+          zone.removeRecord(rRset.first());
+        }
+      }
+    }
+
+    @Override
+    public String getLogDescription() {
+      return "Deleting ";
+    }
+  };
+
+  /**
+   * An implementation allowing for obtaining and releasing a lock.
+   */
+  public static class CloseableLock implements AutoCloseable {
+    private Lock lock;
+
+    public CloseableLock(Lock lock) {
+      this.lock = lock;
+    }
+
+    public CloseableLock lock() {
+      lock.lock();
+      return this;
+    }
+
+    @Override
+    public void close() {
+      lock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java
new file mode 100644
index 0000000..faa5fe1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.DNSOperationsFactory;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.impl.zk.PathListener;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A server/service that starts and manages the lifecycle of a DNS registry
+ * instance.
+ */
+public class RegistryDNSServer extends CompositeService {
+
+
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  private RegistryDNS registryDNS;
+  private RegistryOperationsService registryOperations;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistryDNS.class);
+  private ConcurrentMap<String, ServiceRecord> pathToRecordMap;
+
+  /**
+   * Creates the DNS server.
+   * @param name the server name.
+   */
+  public RegistryDNSServer(String name) {
+    super(name);
+  }
+
+  /**
+   * Initializes the DNS server.
+   * @param conf the hadoop configuration instance.
+   * @throws Exception if service initialization fails.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    pathToRecordMap = new ConcurrentHashMap<>();
+
+    registryOperations = new RegistryOperationsService("RegistryDNSOperations");
+    addService(registryOperations);
+
+    // probably need to populate with existing apps?
+    registryDNS = (RegistryDNS) DNSOperationsFactory.createInstance(conf);
+    addService(registryDNS);
+
+    super.serviceInit(conf);
+  }
+
+  /**
+   * Starts the server.
+   * @throws Exception if service start fails.
+   */
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    manageRegistryDNS();
+  }
+
+  /**
+   * Performs operations required to setup the DNS registry instance (e.g. sets
+   * up a path listener to react to service record creation/deletion and invoke
+   * the appropriate registry method).
+   */
+  private void manageRegistryDNS() {
+
+    try {
+      registryOperations.monitorRegistryEntries();
+      registryOperations.registerPathListener(new PathListener() {
+        private String registryRoot = getConfig().
+            get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+                RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+
+        @Override
+        public void nodeAdded(String path) throws IOException {
+          // get a listing of service records
+          String relativePath = getPathRelativeToRegistryRoot(path);
+          String child = RegistryPathUtils.lastPathEntry(path);
+          Map<String, RegistryPathStatus> map = new HashMap<>();
+          map.put(child, registryOperations.stat(relativePath));
+          Map<String, ServiceRecord> records =
+              RegistryUtils.extractServiceRecords(registryOperations,
+                                                  getAdjustedParentPath(path),
+                                                  map);
+          processServiceRecords(records, register);
+          pathToRecordMap.putAll(records);
+        }
+
+        private String getAdjustedParentPath(String path) {
+          Preconditions.checkNotNull(path);
+          String adjustedPath = null;
+          adjustedPath = getPathRelativeToRegistryRoot(path);
+          try {
+            return RegistryPathUtils.parentOf(adjustedPath);
+          } catch (PathNotFoundException e) {
+            // attempt to use passed in path
+            return path;
+          }
+        }
+
+        private String getPathRelativeToRegistryRoot(String path) {
+          String adjustedPath;
+          if (path.equals(registryRoot)) {
+            adjustedPath = "/";
+          } else {
+            adjustedPath = path.substring(registryRoot.length());
+          }
+          return adjustedPath;
+        }
+
+        @Override
+        public void nodeRemoved(String path) throws IOException {
+          ServiceRecord record = pathToRecordMap.remove(path.substring(
+              registryRoot.length()));
+          processServiceRecord(path, record, delete);
+        }
+
+      });
+
+      // create listener for record deletions
+
+    } catch (Exception e) {
+      LOG.warn("Unable to monitor the registry.  DNS support disabled.", e);
+    }
+  }
+
+  /**
+   * A registry management command interface.
+   */
+  interface ManagementCommand {
+    void exec(String path, ServiceRecord record) throws IOException;
+  }
+
+  /**
+   * Performs registry service record registration.
+   */
+  private final ManagementCommand register = new ManagementCommand() {
+    @Override
+    public void exec(String path, ServiceRecord record) throws IOException {
+      if (record != null) {
+        LOG.info("Registering DNS records for {}", path);
+        registryDNS.register(path, record);
+      }
+    }
+  };
+
+  /**
+   * Performs registry service record deletion.
+   */
+  private ManagementCommand delete = new ManagementCommand() {
+    @Override
+    public void exec(String path, ServiceRecord record) throws IOException {
+      if (record != null) {
+        LOG.info("Deleting DNS records for {}", path);
+        registryDNS.delete(path, record);
+      }
+    }
+  };
+
+  /**
+   * iterates thru the supplied service records, executing the provided registry
+   * command.
+   * @param records the service records.
+   * @param command the registry command.
+   * @throws IOException
+   */
+  private void processServiceRecords(Map<String, ServiceRecord> records,
+                                     ManagementCommand command)
+      throws IOException {
+    for (Map.Entry<String, ServiceRecord> entry : records.entrySet()) {
+      processServiceRecord(entry.getKey(), entry.getValue(), command);
+    }
+  }
+
+  /**
+   * Process the service record, parsing the information and creating the
+   * required DNS records.
+   * @param path  the service record path.
+   * @param record  the record.
+   * @param command  the registry command to execute.
+   * @throws IOException
+   */
+  private void processServiceRecord(String path, ServiceRecord record,
+                                     ManagementCommand command)
+      throws IOException {
+    command.exec(path, record);
+  }
+
+  /**
+   * Launch the server.
+   * @param args command line args.
+   * @return
+   */
+  static RegistryDNSServer launchDNSServer(String[] args) {
+    RegistryDNSServer dnsServer = null;
+
+    Thread
+        .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    StringUtils.startupShutdownMessage(RegistryDNSServer.class, args,
+                                       LOG);
+    try {
+      dnsServer = new RegistryDNSServer("RegistryDNSServer");
+      ShutdownHookManager.get().addShutdownHook(
+          new CompositeService.CompositeServiceShutdownHook(dnsServer),
+          SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration();
+      processCommandLine(args, conf);
+      new GenericOptionsParser(conf, args);
+      dnsServer.init(conf);
+      dnsServer.start();
+    } catch (Throwable t) {
+      LOG.error("Error starting Registry DNS Server", t);
+      ExitUtil.terminate(-1, "Error starting Registry DNS Server");
+    }
+    return dnsServer;
+  }
+
+  /**
+   * Process input command line arguments.
+   * @param args the command line argument array.
+   * @param conf  the configuration.
+   */
+  private static void processCommandLine(String[] args,
+                                         YarnConfiguration conf) {
+    Options options = new Options();
+    options.addOption("p", "port", true,
+                      "the server listening port (override)");
+
+    CommandLineParser parser = new BasicParser();
+    try {
+      CommandLine cmd = parser.parse(options, args);
+      if (cmd.hasOption("p")) {
+        conf.set(RegistryConstants.KEY_DNS_PORT, cmd.getOptionValue("p"));
+      }
+    } catch (ParseException e) {
+      LOG.error("Error parsing the command line options", e);
+    }
+  }
+
+  /**
+   * Lanches the server instance.
+   * @param args the command line args.
+   */
+  public static void main(String[] args) {
+    launchDNSServer(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java
new file mode 100644
index 0000000..4b0a852
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.NXTRecord;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.RRset;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SetResponse;
+import org.xbill.DNS.Type;
+import org.xbill.DNS.Zone;
+import org.xbill.DNS.ZoneTransferException;
+import org.xbill.DNS.ZoneTransferIn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A zone implementation geared to support some DNSSEC functionality.
+ */
+public class SecureableZone extends Zone {
+  private List<Record> records;
+
+  /**
+   * Creates a Zone by doing the specified zone transfer.
+   * @param xfrin The incoming zone transfer to execute.
+   * @throws IOException if there is an error.
+   * @throws ZoneTransferException if there is an error.
+   */
+  public SecureableZone(ZoneTransferIn xfrin)
+      throws IOException, ZoneTransferException {
+    super(xfrin);
+  }
+
+  /**
+   * Creates a Zone by performing a zone transfer to the specified host.
+   * @param zone  zone name.
+   * @param dclass the dclass
+   * @param remote  the remote host.
+   * @throws IOException if there is an error.
+   * @throws ZoneTransferException if there is an error.
+   */
+  public SecureableZone(Name zone, int dclass, String remote)
+      throws IOException, ZoneTransferException {
+    super(zone, dclass, remote);
+  }
+
+  /**
+   * Creates a Zone from the records in the specified master file.
+   * @param zone The name of the zone.
+   * @param file The master file to read from.
+   * @throws IOException if there is an error.
+   */
+  public SecureableZone(Name zone, String file) throws IOException {
+    super(zone, file);
+  }
+
+  /**
+   * Creates a Zone from an array of records.
+   * @param zone The name of the zone.
+   * @param records The records to add to the zone.
+   * @throws IOException if there is an error.
+   */
+  public SecureableZone(Name zone, Record[] records)
+      throws IOException {
+    super(zone, records);
+  }
+
+  /**
+   * Adds a Record to the Zone.
+   * @param r The record to be added
+   * @see Record
+   */
+  @Override public void addRecord(Record r) {
+    if (records == null) {
+      records = new ArrayList<Record>();
+    }
+    super.addRecord(r);
+    records.add(r);
+  }
+
+  /**
+   * Removes a record from the Zone.
+   * @param r The record to be removed
+   * @see Record
+   */
+  @Override public void removeRecord(Record r) {
+    if (records == null) {
+      records = new ArrayList<Record>();
+    }
+    super.removeRecord(r);
+    records.remove(r);
+  }
+
+  /**
+   * Return a NXT record appropriate for the query.
+   * @param queryRecord the query record.
+   * @param zone the zone to search.
+   * @return  the NXT record describing the insertion point.
+   */
+  @SuppressWarnings({"unchecked"})
+  public Record getNXTRecord(Record queryRecord, Zone zone) {
+    Collections.sort(records);
+
+    int index = Collections.binarySearch(records, queryRecord,
+        new Comparator<Record>() {
+          @Override public int compare(Record r1, Record r2) {
+            return r1.compareTo(r2);
+          }
+        });
+    if (index >= 0) {
+      return null;
+    }
+    index = -index - 1;
+    if (index >= records.size()) {
+      index = records.size() - 1;
+    }
+    Record base = records.get(index);
+    SetResponse sr = zone.findRecords(base.getName(), Type.ANY);
+    BitSet bitMap = new BitSet();
+    bitMap.set(Type.NXT);
+    RRset[] rRsets = sr.answers();
+    for (RRset rRset : rRsets) {
+      int typeCode = rRset.getType();
+      if (typeCode > 0 && typeCode < 128) {
+        bitMap.set(typeCode);
+      }
+    }
+    return new NXTRecord(base.getName(), DClass.IN, zone.getSOA().getMinimum(),
+        queryRecord.getName(), bitMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java
new file mode 100644
index 0000000..b67cc7d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+
+import java.io.IOException;
+
+/**
+ *  Manage the processing of service records in order to create DNS records.
+ */
+public interface ServiceRecordProcessor {
+  /**
+   * Initialize the mapping between DNS record type and record information
+   * for the given service record.
+   * @param serviceRecord  the registry service record.
+   * @throws Exception if encountering an error.
+   */
+  void initTypeToInfoMapping(ServiceRecord serviceRecord)
+      throws Exception;
+
+  /**
+   * Return the DNS record types valid for this processor.
+   * @return  the array of DNS record types.
+   */
+  int[] getRecordTypes();
+
+  /**
+   * Manage the creation and registration of DNS records generated by parsing
+   * a service record.
+   * @param command  the DNS registration command object (e.g. add_record,
+   *                 remove record)
+   * @throws IOException if the creation or registration generates an issue.
+   */
+  void manageDNSRecords(RegistryDNS.RegistryCommand command)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java
new file mode 100644
index 0000000..5043b85
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.xbill.DNS.Name;
+import org.xbill.DNS.Zone;
+
+/**
+ * A selector that returns the zone associated with a provided name.
+ */
+public interface ZoneSelector {
+  /**
+   * Finds the best matching zone given the provided name.
+   * @param name the record name for which a zone is requested.
+   * @return the matching zone.
+   */
+  Zone findBestZone(Name name);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java
new file mode 100644
index 0000000..00d8c9db
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * DNS Server classes.
+ * <p>
+ *   These classes are leveraged to create a DNS server that can provide the
+ *   facilities necessary for YARN application and/or service discovery.
+ * </p>
+ */
+package org.apache.hadoop.registry.server.dns;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: YARN-5218. Initial core change for DNS for YARN. Contributed by Jonathan Maron

Posted by ji...@apache.org.
YARN-5218. Initial core change for DNS for YARN. Contributed by Jonathan Maron


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f3c3a22
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f3c3a22
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f3c3a22

Branch: refs/heads/YARN-4757
Commit: 8f3c3a22a27455b1e0851455336827384abef101
Parents: be34e85
Author: Jian He <ji...@apache.org>
Authored: Sun Jun 12 11:32:03 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Sun Jun 12 11:36:41 2016 -0700

----------------------------------------------------------------------
 hadoop-project/pom.xml                          |   16 +-
 .../dev-support/findbugs-exclude.xml            |   16 +
 .../hadoop-yarn/hadoop-yarn-registry/pom.xml    |    5 +
 .../registry/client/api/DNSOperations.java      |   60 +
 .../client/api/DNSOperationsFactory.java        |   78 +
 .../registry/client/api/RegistryConstants.java  |  111 +-
 .../registry/client/impl/zk/CuratorService.java |  266 ++-
 .../registry/client/impl/zk/ListenerHandle.java |   25 +
 .../registry/client/impl/zk/PathListener.java   |   30 +
 .../types/yarn/YarnRegistryAttributes.java      |   16 +-
 .../dns/ApplicationServiceRecordProcessor.java  |  353 ++++
 .../server/dns/BaseServiceRecordProcessor.java  |  469 ++++++
 .../dns/ContainerServiceRecordProcessor.java    |  278 ++++
 .../server/dns/RecordCreatorFactory.java        |  275 ++++
 .../hadoop/registry/server/dns/RegistryDNS.java | 1534 ++++++++++++++++++
 .../registry/server/dns/RegistryDNSServer.java  |  290 ++++
 .../registry/server/dns/SecureableZone.java     |  151 ++
 .../server/dns/ServiceRecordProcessor.java      |   53 +
 .../registry/server/dns/ZoneSelector.java       |   33 +
 .../registry/server/dns/package-info.java       |   26 +
 .../registry/server/dns/TestRegistryDNS.java    |  561 +++++++
 .../server/dns/TestSecureRegistryDNS.java       |   44 +
 .../test/resources/0.17.172.in-addr.arpa.zone   |   36 +
 .../src/test/resources/test.private             |   32 +
 24 files changed, 4665 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index aa47f6c..d1b5209 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -82,6 +82,7 @@
     <zookeeper.version>3.4.6</zookeeper.version>
     <curator.version>2.7.1</curator.version>
     <findbugs.version>3.0.0</findbugs.version>
+    <dnsjava.version>2.1.7</dnsjava.version>
 
     <tomcat.version>6.0.44</tomcat.version>
 
@@ -1005,11 +1006,16 @@
         </exclusions>
       </dependency>
 
-        <dependency>
-            <groupId>org.skyscreamer</groupId>
-            <artifactId>jsonassert</artifactId>
-            <version>1.3.0</version>
-        </dependency>
+      <dependency>
+          <groupId>org.skyscreamer</groupId>
+          <artifactId>jsonassert</artifactId>
+          <version>1.3.0</version>
+      </dependency>
+      <dependency>
+        <groupId>dnsjava</groupId>
+        <artifactId>dnsjava</artifactId>
+        <version>${dnsjava.version}</version>
+      </dependency>
 
     </dependencies>
   </dependencyManagement>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 81c7e6a..48c8577 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -520,4 +520,20 @@
     <Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
     <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
   </Match>
+
+  <Match>
+    <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
+    <Method name="addNIOTCP" />
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+  </Match>
+  <Match>
+    <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
+    <Method name="addNIOUDP" />
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+  </Match>
+  <Match>
+    <Class name="org.apache.hadoop.registry.server.dns.RegistryDNS" />
+    <Method name="serveNIOTCP" />
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
index 46d8a59..5f1196c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/pom.xml
@@ -98,6 +98,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>dnsjava</groupId>
+      <artifactId>dnsjava</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java
new file mode 100644
index 0000000..3abfb6c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.service.Service;
+
+import java.io.IOException;
+
+/**
+ * DNS Operations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface DNSOperations extends Service {
+
+  /**
+   * Register a service based on a service record.
+   *
+   * @param path the ZK path.
+   * @param record record providing DNS registration info.
+   * @throws IOException Any other IO Exception.
+   */
+  void register(String path, ServiceRecord record)
+      throws IOException;
+
+
+  /**
+   * Delete a service's registered endpoints.
+   *
+   * If the operation returns without an error then the entry has been
+   * deleted.
+   *
+   * @param path the ZK path.
+   * @param record service record
+   * @throws IOException Any other IO Exception
+   *
+   */
+  void delete(String path, ServiceRecord record)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java
new file mode 100644
index 0000000..1a8bb3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.api;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.server.dns.RegistryDNS;
+
+/**
+ * A factory for DNS operation service instances.
+ */
+public final class DNSOperationsFactory implements RegistryConstants {
+
+  /**
+   * DNS Implementation type.
+   */
+  public enum DNSImplementation {
+    DNSJAVA
+  }
+
+  private DNSOperationsFactory() {
+  }
+
+  /**
+   * Create and initialize a DNS operations instance.
+   *
+   * @param conf configuration
+   * @return a DNS operations instance
+   */
+  public static DNSOperations createInstance(Configuration conf) {
+    return createInstance("DNSOperations", DNSImplementation.DNSJAVA, conf);
+  }
+
+  /**
+   * Create and initialize a registry operations instance.
+   * Access rights will be determined from the configuration.
+   *
+   * @param name name of the instance
+   * @param impl the DNS implementation.
+   * @param conf configuration
+   * @return a registry operations instance
+   */
+  public static DNSOperations createInstance(String name,
+      DNSImplementation impl,
+      Configuration conf) {
+    Preconditions.checkArgument(conf != null, "Null configuration");
+    DNSOperations operations = null;
+    switch (impl) {
+    case DNSJAVA:
+      operations = new RegistryDNS(name);
+      break;
+
+    default:
+      throw new IllegalArgumentException(
+          String.format("%s is not available", impl.toString()));
+    }
+
+    //operations.init(conf);
+    return operations;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
index a6fe216..7115a4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
@@ -44,17 +44,106 @@ public interface RegistryConstants {
   String ZK_PREFIX = REGISTRY_PREFIX + "zk.";
 
   /**
+   * Prefix for dns-specific options: {@value}
+   *  <p>
+   * For clients using other protocols, these options are not supported.
+   */
+  String DNS_PREFIX = REGISTRY_PREFIX + "dns.";
+
+  /**
    * flag to indicate whether or not the registry should
-   * be enabled in the RM: {@value}
+   * be enabled in the RM: {@value}.
    */
   String KEY_REGISTRY_ENABLED = REGISTRY_PREFIX + "rm.enabled";
 
   /**
-   * Defaut value for enabling the registry in the RM: {@value}
+   * Defaut value for enabling the registry in the RM: {@value}.
    */
   boolean DEFAULT_REGISTRY_ENABLED = false;
 
   /**
+   * flag to indicate whether or not the registry should
+   * be enabled in the RM: {@value}.
+   */
+  String KEY_DNS_ENABLED = DNS_PREFIX + "enabled";
+
+  /**
+   * Defaut value for enabling the DNS in the Registry: {@value}.
+   */
+  boolean DEFAULT_DNS_ENABLED = false;
+
+  /**
+   * DNS domain name key.
+   */
+  String KEY_DNS_DOMAIN = DNS_PREFIX + "domain-name";
+
+  /**
+   * DNS bind address.
+   */
+  String KEY_DNS_BIND_ADDRESS = DNS_PREFIX + "bind-address";
+
+  /**
+   * DNS port number key.
+   */
+  String KEY_DNS_PORT = DNS_PREFIX + "bind-port";
+
+  /**
+   * Default DNS port number.
+   */
+  int DEFAULT_DNS_PORT = 53;
+
+  /**
+   * DNSSEC Enabled?
+   */
+  String KEY_DNSSEC_ENABLED = DNS_PREFIX + "dnssec.enabled";
+
+  /**
+   * DNSSEC Enabled?
+   */
+  String KEY_DNSSEC_PUBLIC_KEY = DNS_PREFIX + "public-key";
+
+  /**
+   * DNSSEC private key file.
+   */
+  String KEY_DNSSEC_PRIVATE_KEY_FILE = DNS_PREFIX + "private-key-file";
+
+  /**
+   * Default DNSSEC private key file path.
+   */
+  String DEFAULT_DNSSEC_PRIVATE_KEY_FILE =
+      "/etc/hadoop/conf/registryDNS.private";
+
+  /**
+   * Zone subnet.
+   */
+  String KEY_DNS_ZONE_SUBNET = DNS_PREFIX + "zone-subnet";
+
+  /**
+   * Zone subnet mask.
+   */
+  String KEY_DNS_ZONE_MASK = DNS_PREFIX + "zone-mask";
+
+  /**
+   * Zone subnet IP min.
+   */
+  String KEY_DNS_ZONE_IP_MIN = DNS_PREFIX + "zone-ip-min";
+
+  /**
+   * Zone subnet IP max.
+   */
+  String KEY_DNS_ZONE_IP_MAX = DNS_PREFIX + "zone-ip-max";
+
+  /**
+   * DNS Record TTL.
+   */
+  String KEY_DNS_TTL = DNS_PREFIX + "dns-ttl";
+
+  /**
+   * DNS Record TTL.
+   */
+  String KEY_DNS_ZONES_DIR = DNS_PREFIX + "zones-dir";
+
+  /**
    * Key to set if the registry is secure: {@value}.
    * Turning it on changes the permissions policy from "open access"
    * to restrictions on kerberos with the option of
@@ -69,12 +158,12 @@ public interface RegistryConstants {
   boolean DEFAULT_REGISTRY_SECURE = false;
 
   /**
-   * Root path in the ZK tree for the registry: {@value}
+   * Root path in the ZK tree for the registry: {@value}.
    */
   String KEY_REGISTRY_ZK_ROOT = ZK_PREFIX + "root";
 
   /**
-   * Default root of the yarn registry: {@value}
+   * Default root of the yarn registry: {@value}.
    */
   String DEFAULT_ZK_REGISTRY_ROOT = "/registry";
 
@@ -92,7 +181,7 @@ public interface RegistryConstants {
 
   /**
    * Registry client uses Kerberos: authentication is automatic from
-   * logged in user
+   * logged in user.
    */
   String REGISTRY_CLIENT_AUTH_KERBEROS = "kerberos";
 
@@ -104,12 +193,12 @@ public interface RegistryConstants {
   String REGISTRY_CLIENT_AUTH_DIGEST = "digest";
 
   /**
-   * No authentication; client is anonymous
+   * No authentication; client is anonymous.
    */
   String REGISTRY_CLIENT_AUTH_ANONYMOUS = "";
 
   /**
-   * Registry client authentication ID
+   * Registry client authentication ID.
    * <p>
    * This is only used in secure clusters with
    * {@link #KEY_REGISTRY_CLIENT_AUTH} set to
@@ -134,17 +223,17 @@ public interface RegistryConstants {
 
   /**
    * List of hostname:port pairs defining the
-   * zookeeper quorum binding for the registry {@value}
+   * zookeeper quorum binding for the registry {@value}.
    */
   String KEY_REGISTRY_ZK_QUORUM = ZK_PREFIX + "quorum";
 
   /**
-   * The default zookeeper quorum binding for the registry: {@value}
+   * The default zookeeper quorum binding for the registry: {@value}.
    */
   String DEFAULT_REGISTRY_ZK_QUORUM = "localhost:2181";
 
   /**
-   * Zookeeper session timeout in milliseconds: {@value}
+   * Zookeeper session timeout in milliseconds: {@value}.
    */
   String KEY_REGISTRY_ZK_SESSION_TIMEOUT =
       ZK_PREFIX + "session.timeout.ms";
@@ -259,7 +348,7 @@ public interface RegistryConstants {
   String KEY_REGISTRY_CLIENT_JAAS_CONTEXT = REGISTRY_PREFIX + "jaas.context";
 
   /**
-   * default client-side registry JAAS context: {@value}
+   * default client-side registry JAAS context: {@value}.
    */
   String DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT = "Client";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
index 7f35c3f..ad008c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
@@ -28,6 +28,9 @@ import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.DeleteBuilder;
 import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -36,14 +39,14 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.PathNotFoundException;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException;
 import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException;
 import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
 import org.apache.hadoop.registry.client.exceptions.RegistryIOException;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.ServiceStateException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -69,12 +72,12 @@ public class CuratorService extends CompositeService
       LoggerFactory.getLogger(CuratorService.class);
 
   /**
-   * the Curator binding
+   * the Curator binding.
    */
   private CuratorFramework curator;
 
   /**
-   * Path to the registry root
+   * Path to the registry root.
    */
   private String registryRoot;
 
@@ -85,17 +88,17 @@ public class CuratorService extends CompositeService
   private final RegistryBindingSource bindingSource;
 
   /**
-   * Security service
+   * Security service.
    */
   private RegistrySecurity registrySecurity;
 
   /**
-   * the connection binding text for messages
+   * the connection binding text for messages.
    */
   private String connectionDescription;
 
   /**
-   * Security connection diagnostics
+   * Security connection diagnostics.
    */
   private String securityConnectionDiagnostics = "";
 
@@ -106,10 +109,16 @@ public class CuratorService extends CompositeService
   private EnsembleProvider ensembleProvider;
 
   /**
+   * Registry tree cache.
+   */
+  private TreeCache treeCache;
+
+  /**
    * Construct the service.
-   * @param name service name
+   *
+   * @param name          service name
    * @param bindingSource source of binding information.
-   * If null: use this instance
+   *                      If null: use this instance
    */
   public CuratorService(String name, RegistryBindingSource bindingSource) {
     super(name);
@@ -122,7 +131,8 @@ public class CuratorService extends CompositeService
 
   /**
    * Create an instance using this service as the binding source (i.e. read
-   * configuration options from the registry)
+   * configuration options from the registry).
+   *
    * @param name service name
    */
   public CuratorService(String name) {
@@ -131,7 +141,8 @@ public class CuratorService extends CompositeService
 
   /**
    * Init the service.
-   * This is where the security bindings are set up
+   * This is where the security bindings are set up.
+   *
    * @param conf configuration of the service
    * @throws Exception
    */
@@ -155,6 +166,7 @@ public class CuratorService extends CompositeService
   /**
    * Start the service.
    * This is where the curator instance is started.
+   *
    * @throws Exception
    */
   @Override
@@ -167,29 +179,35 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Close the ZK connection if it is open
+   * Close the ZK connection if it is open.
    */
   @Override
   protected void serviceStop() throws Exception {
     IOUtils.closeStream(curator);
+
+    if (treeCache != null) {
+      treeCache.close();
+    }
     super.serviceStop();
   }
 
   /**
-   * Internal check that a service is in the live state
+   * Internal check that a service is in the live state.
+   *
    * @throws ServiceStateException if not
    */
   private void checkServiceLive() throws ServiceStateException {
     if (!isInState(STATE.STARTED)) {
       throw new ServiceStateException(
           "Service " + getName() + " is in wrong state: "
-          + getServiceState());
+              + getServiceState());
     }
   }
 
   /**
    * Flag to indicate whether or not the registry is secure.
    * Valid once the service is inited.
+   *
    * @return service security policy
    */
   public boolean isSecure() {
@@ -197,7 +215,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Get the registry security helper
+   * Get the registry security helper.
+   *
    * @return the registry security helper
    */
   protected RegistrySecurity getRegistrySecurity() {
@@ -205,7 +224,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Build the security diagnostics string
+   * Build the security diagnostics string.
+   *
    * @return a string for diagnostics
    */
   protected String buildSecurityDiagnostics() {
@@ -224,6 +244,7 @@ public class CuratorService extends CompositeService
    * Create a new curator instance off the root path; using configuration
    * options provided in the service configuration to set timeouts and
    * retry policy.
+   *
    * @return the newly created creator
    */
   private CuratorFramework createCurator() throws IOException {
@@ -250,14 +271,15 @@ public class CuratorService extends CompositeService
       // set the security options
 
       // build up the curator itself
-      CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+      CuratorFrameworkFactory.Builder builder =
+          CuratorFrameworkFactory.builder();
       builder.ensembleProvider(ensembleProvider)
-       .connectionTimeoutMs(connectionTimeout)
-       .sessionTimeoutMs(sessionTimeout)
+          .connectionTimeoutMs(connectionTimeout)
+          .sessionTimeoutMs(sessionTimeout)
 
-       .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
-           retryCeiling,
-           retryTimes));
+          .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
+              retryCeiling,
+              retryTimes));
 
       // set up the builder AND any JVM context
       registrySecurity.applySecurityEnvironment(builder);
@@ -273,21 +295,23 @@ public class CuratorService extends CompositeService
   @Override
   public String toString() {
     return super.toString()
-           + " " + bindingDiagnosticDetails();
+        + " " + bindingDiagnosticDetails();
   }
 
   /**
-   * Get the binding diagnostics
+   * Get the binding diagnostics.
+   *
    * @return a diagnostics string valid after the service is started.
    */
   public String bindingDiagnosticDetails() {
     return " Connection=\"" + connectionDescription + "\""
-           + " root=\"" + registryRoot + "\""
-           + " " + securityConnectionDiagnostics;
+        + " root=\"" + registryRoot + "\""
+        + " " + securityConnectionDiagnostics;
   }
 
   /**
-   * Create a full path from the registry root and the supplied subdir
+   * Create a full path from the registry root and the supplied subdir.
+   *
    * @param path path of operation
    * @return an absolute path
    * @throws IllegalArgumentException if the path is invalide
@@ -299,6 +323,7 @@ public class CuratorService extends CompositeService
   /**
    * Get the registry binding source ... this can be used to
    * create new ensemble providers
+   *
    * @return the registry binding source in use
    */
   public RegistryBindingSource getBindingSource() {
@@ -308,23 +333,23 @@ public class CuratorService extends CompositeService
   /**
    * Create the ensemble provider for this registry, by invoking
    * {@link RegistryBindingSource#supplyBindingInformation()} on
-   * the provider stored in {@link #bindingSource}
+   * the provider stored in {@link #bindingSource}.
    * Sets {@link #ensembleProvider} to that value;
    * sets {@link #connectionDescription} to the binding info
    * for use in toString and logging;
-   *
    */
   protected void createEnsembleProvider() {
     BindingInformation binding = bindingSource.supplyBindingInformation();
     connectionDescription = binding.description
-                            + " " + securityConnectionDiagnostics;
+        + " " + securityConnectionDiagnostics;
     ensembleProvider = binding.ensembleProvider;
   }
 
   /**
    * Supply the binding information.
    * This implementation returns a fixed ensemble bonded to
-   * the quorum supplied by {@link #buildConnectionString()}
+   * the quorum supplied by {@link #buildConnectionString()}.
+   *
    * @return the binding information
    */
   @Override
@@ -339,17 +364,19 @@ public class CuratorService extends CompositeService
 
   /**
    * Override point: get the connection string used to connect to
-   * the ZK service
+   * the ZK service.
+   *
    * @return a registry quorum
    */
   protected String buildConnectionString() {
     return getConfig().getTrimmed(KEY_REGISTRY_ZK_QUORUM,
-        DEFAULT_REGISTRY_ZK_QUORUM);
+                                  DEFAULT_REGISTRY_ZK_QUORUM);
   }
 
   /**
-   * Create an IOE when an operation fails
-   * @param path path of operation
+   * Create an IOE when an operation fails.
+   *
+   * @param path      path of operation
    * @param operation operation attempted
    * @param exception caught the exception caught
    * @return an IOE to throw that contains the path and operation details.
@@ -361,8 +388,9 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Create an IOE when an operation fails
-   * @param path path of operation
+   * Create an IOE when an operation fails.
+   *
+   * @param path      path of operation
    * @param operation operation attempted
    * @param exception caught the exception caught
    * @return an IOE to throw that contains the path and operation details.
@@ -385,9 +413,10 @@ public class CuratorService extends CompositeService
     } else if (exception instanceof KeeperException.AuthFailedException) {
       ioe = new AuthenticationFailedException(path,
           "Authentication Failed: " + exception
-          + "; " + securityConnectionDiagnostics,
+              + "; " + securityConnectionDiagnostics,
           exception);
-    } else if (exception instanceof KeeperException.NoChildrenForEphemeralsException) {
+    } else if (exception instanceof
+        KeeperException.NoChildrenForEphemeralsException) {
       ioe = new NoChildrenForEphemeralsException(path,
           "Cannot create a path under an ephemeral node: " + exception,
           exception);
@@ -402,7 +431,7 @@ public class CuratorService extends CompositeService
     } else {
       ioe = new RegistryIOException(path,
           "Failure of " + operation + " on " + path + ": " +
-          exception.toString(),
+              exception.toString(),
           exception);
     }
     if (ioe.getCause() == null) {
@@ -417,8 +446,8 @@ public class CuratorService extends CompositeService
    * may create the same path before the create() operation is executed/
    * propagated to the ZK node polled.
    *
-   * @param path path to create
-   * @param acl ACL for path -used when creating a new entry
+   * @param path          path to create
+   * @param acl           ACL for path -used when creating a new entry
    * @param createParents flag to trigger parent creation
    * @return true iff the path was created
    * @throws IOException
@@ -432,10 +461,11 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Stat the file
+   * Stat the file.
+   *
    * @param path path of operation
    * @return a curator stat entry
-   * @throws IOException on a failure
+   * @throws IOException           on a failure
    * @throws PathNotFoundException if the path was not found
    */
   public Stat zkStat(String path) throws IOException {
@@ -457,7 +487,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Get the ACLs of a path
+   * Get the ACLs of a path.
+   *
    * @param path path of operation
    * @return a possibly empty list of ACLs
    * @throws IOException
@@ -481,12 +512,13 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Probe for a path existing
+   * Probe for a path existing.
+   *
    * @param path path of operation
    * @return true if the path was visible from the ZK server
    * queried.
    * @throws IOException on any exception other than
-   * {@link PathNotFoundException}
+   *                     {@link PathNotFoundException}
    */
   public boolean zkPathExists(String path) throws IOException {
     checkServiceLive();
@@ -503,7 +535,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Verify a path exists
+   * Verify a path exists.
+   *
    * @param path path of operation
    * @throws PathNotFoundException if the path is absent
    * @throws IOException
@@ -514,11 +547,12 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Create a directory. It is not an error if it already exists
-   * @param path path to create
-   * @param mode mode for path
+   * Create a directory. It is not an error if it already exists.
+   *
+   * @param path          path to create
+   * @param mode          mode for path
    * @param createParents flag to trigger parent creation
-   * @param acls ACL for path
+   * @param acls          ACL for path
    * @throws IOException any problem
    */
   public boolean zkMkPath(String path,
@@ -558,9 +592,10 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Recursively make a path
+   * Recursively make a path.
+   *
    * @param path path to create
-   * @param acl ACL for path
+   * @param acl  ACL for path
    * @throws IOException any problem
    */
   public void zkMkParentPath(String path,
@@ -574,7 +609,8 @@ public class CuratorService extends CompositeService
 
   /**
    * Create a path with given data. byte[0] is used for a path
-   * without data
+   * without data.
+   *
    * @param path path of operation
    * @param data initial data
    * @param acls
@@ -600,7 +636,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Update the data for a path
+   * Update the data for a path.
+   *
    * @param path path of operation
    * @param data new data
    * @throws IOException
@@ -620,13 +657,14 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Create or update an entry
-   * @param path path
-   * @param data data
-   * @param acl ACL for path -used when creating a new entry
+   * Create or update an entry.
+   *
+   * @param path      path
+   * @param data      data
+   * @param acl       ACL for path -used when creating a new entry
    * @param overwrite enable overwrite
-   * @throws IOException
    * @return true if the entry was created, false if it was simply updated.
+   * @throws IOException
    */
   public boolean zkSet(String path,
       CreateMode mode,
@@ -649,12 +687,13 @@ public class CuratorService extends CompositeService
 
   /**
    * Delete a directory/directory tree.
-   * It is not an error to delete a path that does not exist
-   * @param path path of operation
-   * @param recursive flag to trigger recursive deletion
+   * It is not an error to delete a path that does not exist.
+   *
+   * @param path               path of operation
+   * @param recursive          flag to trigger recursive deletion
    * @param backgroundCallback callback; this being set converts the operation
-   * into an async/background operation.
-   * task
+   *                           into an async/background operation.
+   *                           task
    * @throws IOException on problems other than no-such-path
    */
   public void zkDelete(String path,
@@ -682,7 +721,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * List all children of a path
+   * List all children of a path.
+   *
    * @param path path of operation
    * @return a possibly empty list of children
    * @throws IOException
@@ -703,7 +743,8 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Read data on a path
+   * Read data on a path.
+   *
    * @param path path of operation
    * @return the data
    * @throws IOException read failure
@@ -724,9 +765,10 @@ public class CuratorService extends CompositeService
   /**
    * Return a path dumper instance which can do a full dump
    * of the registry tree in its <code>toString()</code>
-   * operation
-   * @return a class to dump the registry
+   * operation.
+   *
    * @param verbose verbose flag - includes more details (such as ACLs)
+   * @return a class to dump the registry
    */
   public ZKPathDumper dumpPath(boolean verbose) {
     return new ZKPathDumper(curator, registryRoot, verbose);
@@ -734,7 +776,8 @@ public class CuratorService extends CompositeService
 
   /**
    * Add a new write access entry for all future write operations.
-   * @param id ID to use
+   *
+   * @param id   ID to use
    * @param pass password
    * @throws IOException on any failure to build the digest
    */
@@ -746,16 +789,16 @@ public class CuratorService extends CompositeService
   }
 
   /**
-   * Clear all write accessors
+   * Clear all write accessors.
    */
   public void clearWriteAccessors() {
     getRegistrySecurity().resetDigestACLs();
   }
 
-
   /**
    * Diagnostics method to dump a registry robustly.
-   * Any exception raised is swallowed
+   * Any exception raised is swallowed.
+   *
    * @param verbose verbose path dump
    * @return the registry tree
    */
@@ -769,4 +812,79 @@ public class CuratorService extends CompositeService
     }
     return "";
   }
+
+  /**
+   * Registers a listener to path related events.
+   *
+   * @param listener the listener.
+   * @return a handle allowing for the management of the listener.
+   * @throws Exception if registration fails due to error.
+   */
+  public ListenerHandle registerPathListener(final PathListener listener)
+      throws Exception {
+
+    final TreeCacheListener pathChildrenCacheListener =
+        new TreeCacheListener() {
+
+          public void childEvent(CuratorFramework curatorFramework,
+              TreeCacheEvent event)
+              throws Exception {
+            String path = null;
+            if (event != null && event.getData() != null) {
+              path = event.getData().getPath();
+            }
+            assert event != null;
+            switch (event.getType()) {
+            case NODE_ADDED:
+              LOG.info("Informing listener of added node {}", path);
+              listener.nodeAdded(path);
+
+              break;
+
+            case NODE_REMOVED:
+              LOG.info("Informing listener of removed node {}", path);
+              listener.nodeRemoved(path);
+
+              break;
+
+            case NODE_UPDATED:
+              LOG.info("Informing listener of updated node {}", path);
+              listener.nodeAdded(path);
+
+              break;
+
+            default:
+              // do nothing
+              break;
+
+            }
+          }
+        };
+    treeCache.getListenable().addListener(pathChildrenCacheListener);
+
+    return new ListenerHandle() {
+      @Override
+      public void remove() {
+        treeCache.getListenable().removeListener(pathChildrenCacheListener);
+      }
+    };
+
+  }
+
+  // TODO: should caches be stopped and then restarted if need be?
+
+  /**
+   * Create the tree cache that monitors the registry for node addition, update,
+   * and deletion.
+   *
+   * @throws Exception if any issue arises during monitoring.
+   */
+  public void monitorRegistryEntries()
+      throws Exception {
+    String registryPath =
+        getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+            RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+    treeCache = new TreeCache(curator, registryPath);
+    treeCache.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java
new file mode 100644
index 0000000..e43dbbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ListenerHandle.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.client.impl.zk;
+
+/**
+ *
+ */
+public interface ListenerHandle {
+  void remove();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java
new file mode 100644
index 0000000..db1e509
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/PathListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.client.impl.zk;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface PathListener {
+
+  void nodeAdded(String path) throws IOException;
+
+  void nodeRemoved(String path) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
index 7b78932..5eaa9c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
@@ -19,13 +19,23 @@
 package org.apache.hadoop.registry.client.types.yarn;
 
 /**
- * YARN specific attributes in the registry
+ * YARN specific attributes in the registry.
  */
-public class YarnRegistryAttributes {
+public final class YarnRegistryAttributes {
 
   /**
-   * ID. For containers: container ID. For application instances, application ID.
+   * Hidden constructor.
+   */
+  private YarnRegistryAttributes() {
+  }
+
+  /**
+   * ID. For containers: container ID. For application instances,
+   * application ID.
    */
   public static final String YARN_ID = "yarn:id";
   public static final String YARN_PERSISTENCE = "yarn:persistence";
+  public static final String YARN_PATH = "yarn:path";
+  public static final String YARN_HOSTNAME = "yarn:hostname";
+  public static final String YARN_IP = "yarn:ip";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java
new file mode 100644
index 0000000..e6a1b5b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ApplicationServiceRecordProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.Type;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A processor for generating application DNS records from registry service
+ * records.
+ */
+public class ApplicationServiceRecordProcessor extends
+    BaseServiceRecordProcessor {
+
+  /**
+   * Create an application service record processor.
+   *
+   * @param record       the service record
+   * @param path         the service record registry node path
+   * @param domain       the DNS zone/domain name
+   * @param zoneSelector returns the zone associated with the provided name.
+   * @throws Exception  if an issue is generated during instantiation.
+   */
+  public ApplicationServiceRecordProcessor(
+      ServiceRecord record, String path, String domain,
+      ZoneSelector zoneSelector) throws Exception {
+    super(record, path, domain, zoneSelector);
+  }
+
+  /**
+   * Initializes the DNS record type to descriptor mapping based on the
+   * provided service record.
+   *
+   * @param serviceRecord the registry service record.
+   * @throws Exception if an issue is encountered.
+   */
+  @Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
+      throws Exception {
+    for (int type : getRecordTypes()) {
+      switch (type) {
+      case Type.A:
+        createAInfo(serviceRecord);
+        break;
+      case Type.AAAA:
+        createAAAAInfo(serviceRecord);
+        break;
+      case Type.TXT:
+        createTXTInfo(serviceRecord);
+        break;
+      case Type.CNAME:
+        createCNAMEInfo(serviceRecord);
+        break;
+      case Type.SRV:
+        createSRVInfo(serviceRecord);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + type);
+
+      }
+    }
+  }
+
+  /**
+   * Create an application TXT record descriptor.
+   *
+   * @param serviceRecord the service record.
+   * @throws Exception if there is an issue during descriptor creation.
+   */
+  protected void createTXTInfo(ServiceRecord serviceRecord) throws Exception {
+    List<Endpoint> endpoints = serviceRecord.external;
+    List<RecordDescriptor> recordDescriptors = new ArrayList<>();
+    TXTApplicationRecordDescriptor txtInfo;
+    for (Endpoint endpoint : endpoints) {
+      txtInfo = new TXTApplicationRecordDescriptor(
+          serviceRecord, endpoint);
+      recordDescriptors.add(txtInfo);
+    }
+    registerRecordDescriptor(Type.TXT, recordDescriptors);
+  }
+
+  /**
+   * Create an application SRV record descriptor.
+   *
+   * @param serviceRecord the service record.
+   * @throws Exception if there is an issue during descriptor creation.
+   */
+  protected void createSRVInfo(ServiceRecord serviceRecord) throws Exception {
+    List<Endpoint> endpoints = serviceRecord.external;
+    List<RecordDescriptor> recordDescriptors = new ArrayList<>();
+    SRVApplicationRecordDescriptor srvInfo;
+    for (Endpoint endpoint : endpoints) {
+      srvInfo = new SRVApplicationRecordDescriptor(
+          serviceRecord, endpoint);
+      recordDescriptors.add(srvInfo);
+    }
+    registerRecordDescriptor(Type.SRV, recordDescriptors);
+  }
+
+  /**
+   * Create an application CNAME record descriptor.
+   *
+   * @param serviceRecord the service record.
+   * @throws Exception if there is an issue during descriptor creation.
+   */
+  protected void createCNAMEInfo(ServiceRecord serviceRecord) throws Exception {
+    List<Endpoint> endpoints = serviceRecord.external;
+    List<RecordDescriptor> recordDescriptors = new ArrayList<>();
+    CNAMEApplicationRecordDescriptor cnameInfo;
+    for (Endpoint endpoint : endpoints) {
+      cnameInfo = new CNAMEApplicationRecordDescriptor(
+          serviceRecord, endpoint);
+      recordDescriptors.add(cnameInfo);
+    }
+    registerRecordDescriptor(Type.CNAME, recordDescriptors);
+  }
+
+  /**
+   * Create an application AAAA record descriptor.
+   *
+   * @param record the service record.
+   * @throws Exception if there is an issue during descriptor creation.
+   */
+  protected void createAAAAInfo(ServiceRecord record)
+      throws Exception {
+    AAAAApplicationRecordDescriptor
+        recordInfo = new AAAAApplicationRecordDescriptor(
+        getPath(), record);
+    registerRecordDescriptor(Type.AAAA, recordInfo);
+  }
+
+  /**
+   * Create an application A record descriptor.
+   *
+   * @param record the service record.
+   * @throws Exception if there is an issue during descriptor creation.
+   */
+  protected void createAInfo(ServiceRecord record) throws Exception {
+    AApplicationRecordDescriptor recordInfo = new AApplicationRecordDescriptor(
+        getPath(), record);
+    registerRecordDescriptor(Type.A, recordInfo);
+  }
+
+  /**
+   * Returns the record types associated with a container service record.
+   *
+   * @return the record type array
+   */
+  @Override public int[] getRecordTypes() {
+    return new int[] {Type.A, Type.AAAA, Type.CNAME, Type.SRV, Type.TXT};
+  }
+
+  /**
+   * An application TXT record descriptor.
+   */
+  class TXTApplicationRecordDescriptor
+      extends ApplicationRecordDescriptor<List<String>> {
+
+    /**
+     * Creates an application TXT record descriptor.
+     *
+     * @param record service record
+     * @throws Exception
+     */
+    public TXTApplicationRecordDescriptor(ServiceRecord record,
+        Endpoint endpoint) throws Exception {
+      super(record, endpoint);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     *
+     * @param serviceRecord the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord)
+        throws Exception {
+      if (getEndpoint() != null) {
+        this.setNames(new Name[] {getServiceName(), getEndpointName()});
+        this.setTarget(getTextRecords(getEndpoint()));
+      }
+    }
+
+  }
+
+  /**
+   * An application SRV record descriptor.
+   */
+  class SRVApplicationRecordDescriptor extends
+      ApplicationRecordDescriptor<RecordCreatorFactory.HostPortInfo> {
+
+    /**
+     * Creates an application SRV record descriptor.
+     *
+     * @param record service record
+     * @throws Exception
+     */
+    public SRVApplicationRecordDescriptor(ServiceRecord record,
+        Endpoint endpoint) throws Exception {
+      super(record, endpoint);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     *
+     * @param serviceRecord the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord)
+        throws Exception {
+      if (getEndpoint() != null) {
+        this.setNames(new Name[] {getServiceName(), getEndpointName()});
+        this.setTarget(new RecordCreatorFactory.HostPortInfo(
+            Name.fromString(getHost(getEndpoint()) + "."), getPort(
+            getEndpoint())));
+      }
+    }
+
+  }
+
+  /**
+   * An application CNAME record descriptor.
+   */
+  class CNAMEApplicationRecordDescriptor extends
+      ApplicationRecordDescriptor<Name> {
+
+    /**
+     * Creates an application CNAME record descriptor.
+     *
+     * @param path   registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public CNAMEApplicationRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(record);
+    }
+
+    /**
+     * Creates an application CNAME record descriptor.  This descriptor is the
+     * source for API related CNAME records.
+     *
+     * @param record   service record
+     * @param endpoint the API endpoint
+     * @throws Exception
+     */
+    public CNAMEApplicationRecordDescriptor(ServiceRecord record,
+        Endpoint endpoint) throws Exception {
+      super(record, endpoint);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     *
+     * @param serviceRecord the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord)
+        throws Exception {
+      if (getEndpoint() != null) {
+        this.setNames(new Name[] {getEndpointName()});
+        this.setTarget(getServiceName());
+      }
+    }
+
+  }
+
+  /**
+   * An application A record descriptor.
+   */
+  class AApplicationRecordDescriptor
+      extends ApplicationRecordDescriptor<InetAddress> {
+
+    /**
+     * Creates an application A record descriptor.
+     *
+     * @param path   registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public AApplicationRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(record);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     *
+     * @param serviceRecord the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord)
+        throws Exception {
+      this.setNames(new Name[] {getServiceName()});
+      List<Endpoint> endpoints = serviceRecord.external;
+      // TODO:  do we need a "hostname" attribute for an application record or
+      // can we rely on the first endpoint record.
+      this.setTarget(InetAddress.getByName(
+          getHost(endpoints.get(0))));
+    }
+
+  }
+
+  /**
+   * An application AAAA record descriptor.
+   */
+  class AAAAApplicationRecordDescriptor extends AApplicationRecordDescriptor {
+
+    /**
+     * Creates an application AAAA record descriptor.
+     *
+     * @param path   registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public AAAAApplicationRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(path, record);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     *
+     * @param serviceRecord the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord)
+        throws Exception {
+      super.init(serviceRecord);
+      try {
+        this.setTarget(getIpv6Address(getTarget()));
+      } catch (UnknownHostException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java
new file mode 100644
index 0000000..1289fb3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/BaseServiceRecordProcessor.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.ReverseMap;
+import org.xbill.DNS.TextParseException;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Provides common service record processing logic.
+ */
+public abstract class BaseServiceRecordProcessor
+    implements ServiceRecordProcessor {
+
+  private final ZoneSelector zoneSelctor;
+  private Map<Integer, List<RecordDescriptor>> typeToDescriptorMap =
+      new HashMap<>();
+  private String path;
+  private String domain;
+
+  private static final Pattern USER_NAME = Pattern.compile("/users/(\\w*)/?");
+  private static final String SLIDER_API_PREFIX =
+      "classpath:org.apache.slider.";
+  private static final String HTTP_API_TYPE = "http://";
+
+  /**
+   * Creates a service record processor.
+   *
+   * @param record       the service record.
+   * @param path         the node path for the record in the registry.
+   * @param domain       the target DNS domain for the service record
+   *                     associated DNS records.
+   * @param zoneSelector A selector of the best zone for a given DNS name.
+   * @throws Exception if an issue is generated during instantiation.
+   */
+  public BaseServiceRecordProcessor(ServiceRecord record, String path,
+      String domain, ZoneSelector zoneSelector)
+      throws Exception {
+    this.setPath(path);
+    this.domain = domain;
+    this.zoneSelctor = zoneSelector;
+    initTypeToInfoMapping(record);
+  }
+
+  /**
+   * Return the username found in the ZK path.
+   *
+   * @param recPath the ZK recPath.
+   * @return the user name.
+   */
+  protected String getUsername(String recPath) {
+    String user = "anonymous";
+    Matcher matcher = USER_NAME.matcher(recPath);
+    if (matcher.find()) {
+      user = matcher.group(1);
+    }
+    return user;
+  }
+
+  /**
+   * Return the IPv6 mapped address for the provided IPv4 address. Utilized
+   * to create corresponding AAAA records.
+   *
+   * @param address the IPv4 address.
+   * @return the mapped IPv6 address.
+   * @throws UnknownHostException
+   */
+  static InetAddress getIpv6Address(InetAddress address)
+      throws UnknownHostException {
+    String[] octets = address.getHostAddress().split("\\.");
+    byte[] octetBytes = new byte[4];
+    for (int i = 0; i < 4; ++i) {
+      octetBytes[i] = (byte) Integer.parseInt(octets[i]);
+    }
+
+    byte[] ipv4asIpV6addr = new byte[16];
+    ipv4asIpV6addr[10] = (byte) 0xff;
+    ipv4asIpV6addr[11] = (byte) 0xff;
+    ipv4asIpV6addr[12] = octetBytes[0];
+    ipv4asIpV6addr[13] = octetBytes[1];
+    ipv4asIpV6addr[14] = octetBytes[2];
+    ipv4asIpV6addr[15] = octetBytes[3];
+
+    return Inet6Address.getByAddress(null, ipv4asIpV6addr, 0);
+  }
+
+  /**
+   * Reverse the string representation of the input IP address.
+   *
+   * @param ip the string representation of the IP address.
+   * @return the reversed IP address.
+   * @throws UnknownHostException if the ip is unknown.
+   */
+  protected Name reverseIP(String ip) throws UnknownHostException {
+    return ReverseMap.fromAddress(ip);
+  }
+
+  /**
+   * Manages the creation and registration of service record generated DNS
+   * records.
+   *
+   * @param command the DNS registration command object (e.g. add_record,
+   *                remove record)
+   * @throws IOException if the creation or registration generates an issue.
+   */
+  @SuppressWarnings({"unchecked"})
+  public void manageDNSRecords(RegistryDNS.RegistryCommand command)
+      throws IOException {
+    for (Map.Entry<Integer, List<RecordDescriptor>> entry :
+        typeToDescriptorMap.entrySet()) {
+      for (RecordDescriptor recordDescriptor : entry.getValue()) {
+        for (Name name : recordDescriptor.getNames()) {
+          RecordCreatorFactory.RecordCreator recordCreator =
+              RecordCreatorFactory.getRecordCreator(entry.getKey());
+          command.exec(zoneSelctor.findBestZone(name),
+              recordCreator.create(name, recordDescriptor.getTarget()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Add the DNS record descriptor object to the record type to descriptor
+   * mapping.
+   *
+   * @param type             the DNS record type.
+   * @param recordDescriptor the DNS record descriptor
+   */
+  protected void registerRecordDescriptor(int type,
+      RecordDescriptor recordDescriptor) {
+    List<RecordDescriptor> infos = new ArrayList<>();
+    infos.add(recordDescriptor);
+    typeToDescriptorMap.put(type, infos);
+  }
+
+  /**
+   * Add the DNS record descriptor objects to the record type to descriptor
+   * mapping.
+   *
+   * @param type              the DNS record type.
+   * @param recordDescriptors the DNS record descriptors
+   */
+  protected void registerRecordDescriptor(int type,
+      List<RecordDescriptor> recordDescriptors) {
+    typeToDescriptorMap.put(type, recordDescriptors);
+  }
+
+  /**
+   * Return the path associated with the record.
+   * @return the path.
+   */
+  protected String getPath() {
+    return path;
+  }
+
+  /**
+   * Set the path associated with the record.
+   * @param path the path.
+   */
+  protected void setPath(String path) {
+    this.path = path;
+  }
+
+  /**
+   * A descriptor container the information to be populated into a DNS record.
+   *
+   * @param <T> the DNS record type/class.
+   */
+  abstract class RecordDescriptor<T> {
+    private final ServiceRecord record;
+    private Name[] names;
+    private T target;
+
+    /**
+     * Creates a DNS record descriptor.
+     *
+     * @param record the associated service record.
+     */
+    public RecordDescriptor(ServiceRecord record) {
+      this.record = record;
+    }
+
+    /**
+     * Returns the DNS names associated with the record type and information.
+     *
+     * @return the array of names.
+     */
+    public Name[] getNames() {
+      return names;
+    }
+
+    /**
+     * Return the target object for the DNS record.
+     *
+     * @return the DNS record target.
+     */
+    public T getTarget() {
+      return target;
+    }
+
+    /**
+     * Initializes the names and information for this DNS record descriptor.
+     *
+     * @param serviceRecord the service record.
+     * @throws Exception
+     */
+    protected abstract void init(ServiceRecord serviceRecord) throws Exception;
+
+    /**
+     * Returns the service record.
+     * @return the service record.
+     */
+    public ServiceRecord getRecord() {
+      return record;
+    }
+
+    /**
+     * Sets the names associated with the record type and information.
+     * @param names the names.
+     */
+    public void setNames(Name[] names) {
+      this.names = names;
+    }
+
+    /**
+     * Sets the target object associated with the record.
+     * @param target the target.
+     */
+    public void setTarget(T target) {
+      this.target = target;
+    }
+  }
+
+  /**
+   * A container-based DNS record descriptor.
+   *
+   * @param <T> the DNS record type/class.
+   */
+  abstract class ContainerRecordDescriptor<T> extends RecordDescriptor<T> {
+
+    public ContainerRecordDescriptor(String path, ServiceRecord record)
+        throws Exception {
+      super(record);
+      init(record);
+    }
+
+    /**
+     * Returns the DNS name constructed from the YARN container ID.
+     *
+     * @return the container ID name.
+     * @throws TextParseException
+     */
+    protected Name getContainerIDName() throws TextParseException {
+      String containerID = RegistryPathUtils.lastPathEntry(getPath());
+      containerID = containerID.replace("container", "ctr");
+      return Name.fromString(String.format("%s.%s", containerID, domain));
+    }
+
+    /**
+     * Returns the DNS name constructed from the container role/component name.
+     *
+     * @return the DNS naem.
+     * @throws PathNotFoundException
+     * @throws TextParseException
+     */
+    protected Name getContainerName()
+        throws PathNotFoundException, TextParseException {
+      String service = RegistryPathUtils.lastPathEntry(
+          RegistryPathUtils.parentOf(RegistryPathUtils.parentOf(getPath())));
+      String description = getRecord().description.toLowerCase();
+      String user = getUsername(getPath());
+      return Name.fromString(MessageFormat.format("{0}.{1}.{2}.{3}",
+          description,
+          service,
+          user,
+          domain));
+    }
+
+  }
+
+  /**
+   * An application-based DNS record descriptor.
+   *
+   * @param <T> the DNS record type/class.
+   */
+  abstract class ApplicationRecordDescriptor<T> extends RecordDescriptor<T> {
+
+    private Endpoint srEndpoint;
+
+    /**
+     * Creates an application associated DNS record descriptor.
+     *
+     * @param record the service record.
+     * @throws Exception
+     */
+    public ApplicationRecordDescriptor(ServiceRecord record)
+        throws Exception {
+      this(record, null);
+    }
+
+    /**
+     * Creates an application associated DNS record descriptor.  The endpoint
+     * is leverated to create an associated application API record.
+     *
+     * @param record   the service record.
+     * @param endpoint an API endpoint.
+     * @throws Exception
+     */
+    public ApplicationRecordDescriptor(ServiceRecord record,
+        Endpoint endpoint) throws Exception {
+      super(record);
+      this.setEndpoint(endpoint);
+      init(record);
+    }
+
+    /**
+     * Get the service's DNS name for registration.
+     *
+     * @return the service DNS name.
+     * @throws TextParseException
+     */
+    protected Name getServiceName() throws TextParseException {
+      String user = getUsername(getPath());
+      String service =
+          String.format("%s.%s.%s",
+              RegistryPathUtils.lastPathEntry(getPath()),
+              user,
+              domain);
+      return Name.fromString(service);
+    }
+
+    /**
+     * Get the host from the provided endpoint record.
+     *
+     * @param endpoint the endpoint info.
+     * @return the host name.
+     */
+    protected String getHost(Endpoint endpoint) {
+      String host = null;
+      // assume one address for now
+      Map<String, String> address = endpoint.addresses.get(0);
+      if (endpoint.addressType.equals(AddressTypes.ADDRESS_HOSTNAME_AND_PORT)) {
+        host = address.get(AddressTypes.ADDRESS_HOSTNAME_FIELD);
+      } else if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
+        URI uri = URI.create(address.get("uri"));
+        host = uri.getHost();
+      }
+      return host;
+    }
+
+    /**
+     * Get the post from the provided endpoint record.
+     *
+     * @param endpoint the endpoint info.
+     * @return the port.
+     */
+    protected int getPort(Endpoint endpoint) {
+      int port = -1;
+      // assume one address for now
+      Map<String, String> address = endpoint.addresses.get(0);
+      if (endpoint.addressType.equals(AddressTypes.ADDRESS_HOSTNAME_AND_PORT)) {
+        port = Integer.parseInt(address.get(AddressTypes.ADDRESS_PORT_FIELD));
+      } else if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
+        URI uri = URI.create(address.get("uri"));
+        port = uri.getPort();
+      }
+      return port;
+    }
+
+    /**
+     * Get the list of strings that can be related in a TXT record for the given
+     * endpoint.
+     *
+     * @param endpoint the endpoint information.
+     * @return the list of strings relating endpoint info.
+     */
+    protected List<String> getTextRecords(Endpoint endpoint) {
+      Map<String, String> address = endpoint.addresses.get(0);
+      List<String> txtRecs = new ArrayList<String>();
+      txtRecs.add("api=" + getDNSApiFragment(endpoint.api));
+      if (endpoint.addressType.equals(AddressTypes.ADDRESS_URI)) {
+        URI uri = URI.create(address.get("uri"));
+        txtRecs.add("path=" + uri.getPath());
+      }
+      return txtRecs;
+    }
+
+    /**
+     * Get an API name that is compatible with DNS standards (and shortened).
+     *
+     * @param api the api indicator.
+     * @return the shortened and compatible api name.
+     */
+    protected String getDNSApiFragment(String api) {
+      String dnsApi = null;
+      if (api.startsWith(SLIDER_API_PREFIX)) {
+        dnsApi = api.substring(SLIDER_API_PREFIX.length());
+      } else if (api.startsWith(HTTP_API_TYPE)) {
+        dnsApi = "http";
+      }
+      assert dnsApi != null;
+      dnsApi = dnsApi.replace('.', '-');
+      return dnsApi;
+    }
+
+    /**
+     * Return the DNS name associated with the API endpoint.
+     *
+     * @return the name.
+     * @throws TextParseException
+     */
+    protected Name getEndpointName() throws TextParseException {
+      return Name.fromString(String.format("%s-api.%s",
+          getDNSApiFragment(
+              getEndpoint().api),
+          getServiceName()));
+    }
+
+    /**
+     * Returns the endpoint.
+     * @return the endpoint.
+     */
+    public Endpoint getEndpoint() {
+      return srEndpoint;
+    }
+
+    /**
+     * Sets the endpoint.
+     * @param endpoint the endpoint.
+     */
+    public void setEndpoint(
+        Endpoint endpoint) {
+      this.srEndpoint = endpoint;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java
new file mode 100644
index 0000000..75873d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ContainerServiceRecordProcessor.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.TextParseException;
+import org.xbill.DNS.Type;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A processor for generating container DNS records from registry service
+ * records.
+ */
+public class ContainerServiceRecordProcessor extends
+    BaseServiceRecordProcessor {
+
+  /**
+   * Create a container service record processor.
+   * @param record the service record
+   * @param path the service record registry node path
+   * @param domain the DNS zone/domain name
+   * @param zoneSelector returns the zone associated with the provided name.
+   * @throws Exception if an issue is generated during instantiation.
+   */
+  public ContainerServiceRecordProcessor(
+      ServiceRecord record, String path, String domain,
+      ZoneSelector zoneSelector) throws Exception {
+    super(record, path, domain, zoneSelector);
+  }
+
+  /**
+   * Initializes the DNS record type to descriptor mapping based on the
+   * provided service record.
+   * @param serviceRecord  the registry service record.
+   * @throws Exception if an issue arises.
+   */
+  @Override public void initTypeToInfoMapping(ServiceRecord serviceRecord)
+      throws Exception {
+    if (serviceRecord.get(YarnRegistryAttributes.YARN_IP) != null) {
+      for (int type : getRecordTypes()) {
+        switch (type) {
+        case Type.A:
+          createAInfo(serviceRecord);
+          break;
+        case Type.AAAA:
+          createAAAAInfo(serviceRecord);
+          break;
+        case Type.PTR:
+          createPTRInfo(serviceRecord);
+          break;
+        case Type.TXT:
+          createTXTInfo(serviceRecord);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown type " + type);
+
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a container TXT record descriptor.
+   * @param serviceRecord the service record.
+   * @throws Exception if the descriptor creation yields an issue.
+   */
+  protected void createTXTInfo(ServiceRecord serviceRecord) throws Exception {
+    TXTContainerRecordDescriptor txtInfo =
+        new TXTContainerRecordDescriptor(getPath(), serviceRecord);
+    registerRecordDescriptor(Type.TXT, txtInfo);
+  }
+
+  /**
+   * Creates a container PTR record descriptor.
+   * @param record the service record.
+   * @throws Exception if the descriptor creation yields an issue.
+   */
+  protected void createPTRInfo(ServiceRecord record) throws Exception {
+    PTRContainerRecordDescriptor
+        ptrInfo = new PTRContainerRecordDescriptor(getPath(), record);
+    registerRecordDescriptor(Type.PTR, ptrInfo);
+  }
+
+  /**
+   * Creates a container AAAA (IPv6) record descriptor.
+   * @param record the service record
+   * @throws Exception if the descriptor creation yields an issue.
+   */
+  protected void createAAAAInfo(ServiceRecord record)
+      throws Exception {
+    AAAAContainerRecordDescriptor
+        recordInfo = new AAAAContainerRecordDescriptor(
+        getPath(), record);
+    registerRecordDescriptor(Type.AAAA, recordInfo);
+  }
+
+  /**
+   * Creates a container A (IPv4) record descriptor.
+   * @param record service record.
+   * @throws Exception if the descriptor creation yields an issue.
+   */
+  protected void createAInfo(ServiceRecord record) throws Exception {
+    AContainerRecordDescriptor recordInfo = new AContainerRecordDescriptor(
+        getPath(), record);
+    registerRecordDescriptor(Type.A, recordInfo);
+  }
+
+  /**
+   * Returns the record types associated with a container service record.
+   * @return the record type array
+   */
+  @Override public int[] getRecordTypes() {
+    return new int[] {Type.A, Type.AAAA, Type.PTR, Type.TXT};
+  }
+
+  /**
+   * A container TXT record descriptor.
+   */
+  class TXTContainerRecordDescriptor
+      extends ContainerRecordDescriptor<List<String>> {
+
+    /**
+     * Creates a container TXT record descriptor.
+     * @param path registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public TXTContainerRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(path, record);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     * @param serviceRecord  the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord) {
+      try {
+        this.setNames(new Name[] {getContainerIDName()});
+      } catch (TextParseException e) {
+        // log
+      }
+      List<String> txts = new ArrayList<>();
+      txts.add("id=" + serviceRecord.get(YarnRegistryAttributes.YARN_ID));
+      this.setTarget(txts);
+    }
+
+  }
+
+  /**
+   * A container PTR record descriptor.
+   */
+  class PTRContainerRecordDescriptor extends ContainerRecordDescriptor<Name> {
+
+    /**
+     * Creates a container PTR record descriptor.
+     * @param path registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public PTRContainerRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(path, record);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     * @param serviceRecord  the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord) {
+      String host = serviceRecord.get(YarnRegistryAttributes.YARN_HOSTNAME);
+      String ip = serviceRecord.get(YarnRegistryAttributes.YARN_IP);
+      Name reverseLookupName = null;
+      if (host != null && ip != null) {
+        try {
+          reverseLookupName = reverseIP(ip);
+        } catch (UnknownHostException e) {
+          //LOG
+        }
+      }
+      this.setNames(new Name[] {reverseLookupName});
+      try {
+        this.setTarget(getContainerIDName());
+      } catch (TextParseException e) {
+        //LOG
+      }
+    }
+
+  }
+
+
+  /**
+   * A container A record descriptor.
+   */
+  class AContainerRecordDescriptor
+      extends ContainerRecordDescriptor<InetAddress> {
+
+    /**
+     * Creates a container A record descriptor.
+     * @param path registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public AContainerRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(path, record);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     * @param serviceRecord  the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord) {
+      String ip = serviceRecord.get(YarnRegistryAttributes.YARN_IP);
+      if (ip == null) {
+        throw new IllegalArgumentException("No IP specified");
+      }
+      try {
+        this.setTarget(InetAddress.getByName(ip));
+        this.setNames(new Name[] {getContainerName(), getContainerIDName()});
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+
+    }
+
+  }
+
+  /**
+   * A container AAAA record descriptor.
+   */
+  class AAAAContainerRecordDescriptor extends AContainerRecordDescriptor {
+
+    /**
+     * Creates a container AAAA record descriptor.
+     * @param path registry path for service record
+     * @param record service record
+     * @throws Exception
+     */
+    public AAAAContainerRecordDescriptor(String path,
+        ServiceRecord record) throws Exception {
+      super(path, record);
+    }
+
+    /**
+     * Initializes the descriptor parameters.
+     * @param serviceRecord  the service record.
+     */
+    @Override protected void init(ServiceRecord serviceRecord) {
+      super.init(serviceRecord);
+      try {
+        this.setTarget(getIpv6Address(getTarget()));
+      } catch (UnknownHostException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f3c3a22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java
new file mode 100644
index 0000000..23f9501
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RecordCreatorFactory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import org.xbill.DNS.AAAARecord;
+import org.xbill.DNS.ARecord;
+import org.xbill.DNS.CNAMERecord;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.PTRRecord;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SRVRecord;
+import org.xbill.DNS.TXTRecord;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import static org.xbill.DNS.Type.*;
+
+/**
+ * A factory for creating DNS records.
+ */
+public final class RecordCreatorFactory {
+  private static long ttl;
+
+  /**
+   * Private constructor.
+   */
+  private RecordCreatorFactory() {
+  }
+
+  /**
+   * Returns the DNS record creator for the provided type.
+   *
+   * @param type the DNS record type.
+   * @return the record creator.
+   */
+  static RecordCreator getRecordCreator(int type) {
+    switch (type) {
+    case A:
+      return new ARecordCreator();
+    case CNAME:
+      return new CNAMERecordCreator();
+    case TXT:
+      return new TXTRecordCreator();
+    case AAAA:
+      return new AAAARecordCreator();
+    case PTR:
+      return new PTRRecordCreator();
+    case SRV:
+      return new SRVRecordCreator();
+    default:
+      throw new IllegalArgumentException("No type " + type);
+
+    }
+  }
+
+  /**
+   * Set the TTL value for the records created by the factory.
+   *
+   * @param ttl the ttl value, in seconds.
+   */
+  public static void setTtl(long ttl) {
+    RecordCreatorFactory.ttl = ttl;
+  }
+
+  /**
+   * A DNS Record creator.
+   *
+   * @param <R> the record type
+   * @param <T> the record's target type
+   */
+  public interface RecordCreator<R extends Record, T> {
+    R create(Name name, T target);
+  }
+
+  /**
+   * An A Record creator.
+   */
+  static class ARecordCreator implements RecordCreator<ARecord, InetAddress> {
+    /**
+     * Creates an A record creator.
+     */
+    public ARecordCreator() {
+    }
+
+    /**
+     * Creates a DNS A record.
+     *
+     * @param name   the record name.
+     * @param target the record target/value.
+     * @return an A record.
+     */
+    @Override public ARecord create(Name name, InetAddress target) {
+      return new ARecord(name, DClass.IN, ttl, target);
+    }
+  }
+
+  /**
+   * An AAAA Record creator.
+   */
+  static class AAAARecordCreator
+      implements RecordCreator<AAAARecord, InetAddress> {
+    /**
+     * Creates an AAAA record creator.
+     */
+    public AAAARecordCreator() {
+    }
+
+    /**
+     * Creates a DNS AAAA record.
+     *
+     * @param name   the record name.
+     * @param target the record target/value.
+     * @return an A record.
+     */
+    @Override public AAAARecord create(Name name, InetAddress target) {
+      return new AAAARecord(name, DClass.IN, ttl, target);
+    }
+  }
+
+  static class CNAMERecordCreator implements RecordCreator<CNAMERecord, Name> {
+    /**
+     * Creates a CNAME record creator.
+     */
+    public CNAMERecordCreator() {
+    }
+
+    /**
+     * Creates a DNS CNAME record.
+     *
+     * @param name   the record name.
+     * @param target the record target/value.
+     * @return an A record.
+     */
+    @Override public CNAMERecord create(Name name, Name target) {
+      return new CNAMERecord(name, DClass.IN, ttl, target);
+    }
+  }
+
+  /**
+   * A TXT Record creator.
+   */
+  static class TXTRecordCreator
+      implements RecordCreator<TXTRecord, List<String>> {
+    /**
+     * Creates a TXT record creator.
+     */
+    public TXTRecordCreator() {
+    }
+
+    /**
+     * Creates a DNS TXT record.
+     *
+     * @param name   the record name.
+     * @param target the record target/value.
+     * @return an A record.
+     */
+    @Override public TXTRecord create(Name name, List<String> target) {
+      return new TXTRecord(name, DClass.IN, ttl, target);
+    }
+  }
+
+  /**
+   * A PTR Record creator.
+   */
+  static class PTRRecordCreator implements RecordCreator<PTRRecord, Name> {
+    /**
+     * Creates a PTR record creator.
+     */
+    public PTRRecordCreator() {
+    }
+
+    /**
+     * Creates a DNS PTR record.
+     *
+     * @param name   the record name.
+     * @param target the record target/value.
+     * @return an A record.
+     */
+    @Override public PTRRecord create(Name name, Name target) {
+      return new PTRRecord(name, DClass.IN, ttl, target);
+    }
+  }
+
+  /**
+   * A SRV Record creator.
+   */
+  static class SRVRecordCreator
+      implements RecordCreator<SRVRecord, HostPortInfo> {
+    /**
+     * Creates a SRV record creator.
+     */
+    public SRVRecordCreator() {
+    }
+
+    /**
+     * Creates a DNS SRV record.
+     *
+     * @param name   the record name.
+     * @param target the record target/value.
+     * @return an A record.
+     */
+    @Override public SRVRecord create(Name name, HostPortInfo target) {
+      return new SRVRecord(name, DClass.IN, ttl, 1, 1, target.getPort(),
+          target.getHost());
+    }
+  }
+
+  /**
+   * An object for storing the host and port info used to generate SRV records.
+   */
+  public static class HostPortInfo {
+    private Name host;
+    private int port;
+
+    /**
+     * Creates an object with a host and port pair.
+     *
+     * @param host the hostname/ip
+     * @param port the port value
+     */
+    public HostPortInfo(Name host, int port) {
+      this.setHost(host);
+      this.setPort(port);
+    }
+
+    /**
+     * Return the host name.
+     * @return the host name.
+     */
+    Name getHost() {
+      return host;
+    }
+
+    /**
+     * Set the host name.
+     * @param host the host name.
+     */
+    void setHost(Name host) {
+      this.host = host;
+    }
+
+    /**
+     * Get the port.
+     * @return the port.
+     */
+    int getPort() {
+      return port;
+    }
+
+    /**
+     * Set the port.
+     * @param port the port.
+     */
+    void setPort(int port) {
+      this.port = port;
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org