You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2014/11/06 21:22:37 UTC
[1/3] hadoop git commit: YARN-2768 Improved Yarn Registry service
record structure (stevel)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 0f9199fb0 -> 5924e74d5
refs/heads/branch-2.6 b557f689b -> e333584ca
refs/heads/trunk f5b19bed7 -> 167057801
YARN-2768 Improved Yarn Registry service record structure (stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5924e74d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5924e74d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5924e74d
Branch: refs/heads/branch-2
Commit: 5924e74d550b3ac5e5d65c2fc80275095de1c0e1
Parents: 0f9199f
Author: Steve Loughran <st...@apache.org>
Authored: Thu Nov 6 20:21:25 2014 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Nov 6 20:21:25 2014 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../apache/hadoop/registry/cli/RegistryCli.java | 25 +-
.../registry/client/binding/JsonSerDeser.java | 142 +++++------
.../client/binding/RegistryTypeUtils.java | 166 ++++++++-----
.../registry/client/binding/RegistryUtils.java | 7 +-
.../client/exceptions/NoRecordException.java | 10 +-
.../impl/zk/RegistryOperationsService.java | 12 +-
.../registry/client/types/AddressTypes.java | 2 +
.../hadoop/registry/client/types/Endpoint.java | 131 ++++++++---
.../registry/client/types/ProtocolTypes.java | 7 +-
.../registry/client/types/ServiceRecord.java | 26 +--
.../client/types/ServiceRecordHeader.java | 59 -----
.../src/main/tla/yarnregistry.tla | 94 ++++++--
.../hadoop/registry/RegistryTestHelper.java | 36 ++-
.../client/binding/TestMarshalling.java | 72 ++++--
.../operations/TestRegistryOperations.java | 5 +-
.../src/site/markdown/registry/yarn-registry.md | 233 +++++++++++++++----
17 files changed, 623 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8335d2b..6689d89 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -710,6 +710,8 @@ Release 2.6.0 - UNRELEASED
YARN-2677 registry punycoding of usernames doesn't fix all usernames to be
DNS-valid (stevel)
+ YARN-2768 Improved Yarn Registry service record structure (stevel)
+
---
YARN-2598 GHS should show N/A instead of null for the inaccessible information
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
index 863039e..bf2b5e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
@@ -24,6 +24,7 @@ import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
@@ -174,24 +175,22 @@ public class RegistryCli extends Configured implements Tool {
ServiceRecord record = registry.resolve(argsList.get(1));
for (Endpoint endpoint : record.external) {
- if ((endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_WEBUI))
- || (endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_REST))) {
- sysout.print(" Endpoint(ProtocolType="
- + endpoint.protocolType + ", Api="
- + endpoint.api + "); Uris are: ");
- } else {
- sysout.print(" Endpoint(ProtocolType="
+ sysout.println(" Endpoint(ProtocolType="
+ endpoint.protocolType + ", Api="
+ endpoint.api + ");"
+ " Addresses(AddressType="
+ endpoint.addressType + ") are: ");
- }
- for (List<String> a : endpoint.addresses) {
- sysout.print(a + " ");
- }
- sysout.println();
- }
+ for (Map<String, String> address : endpoint.addresses) {
+ sysout.println(" [ ");
+ for (Map.Entry<String, String> entry : address.entrySet()) {
+ sysout.println(" " + entry.getKey()
+ + ": \"" + entry.getValue() + "\"");
+ }
+ sysout.println(" ]");
+ }
+ sysout.println();
+ }
return 0;
} catch (Exception e) {
syserr.println(analyzeException("resolve", e, argsList));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
index e086e36..af4e4f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.registry.client.binding;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,8 +46,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
/**
* Support for marshalling objects to and from JSON.
@@ -62,30 +61,30 @@ public class JsonSerDeser<T> {
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class);
private static final String UTF_8 = "UTF-8";
- public static final String E_NO_SERVICE_RECORD = "No service record at path";
+ public static final String E_NO_DATA = "No data at path";
+ public static final String E_DATA_TOO_SHORT = "Data at path too short";
+ public static final String E_MISSING_MARKER_STRING =
+ "Missing marker string: ";
private final Class<T> classType;
private final ObjectMapper mapper;
- private final byte[] header;
/**
* Create an instance bound to a specific type
* @param classType class to marshall
- * @param header byte array to use as header
*/
- public JsonSerDeser(Class<T> classType, byte[] header) {
+ public JsonSerDeser(Class<T> classType) {
Preconditions.checkArgument(classType != null, "null classType");
- Preconditions.checkArgument(header != null, "null header");
this.classType = classType;
this.mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- // make an immutable copy to keep findbugs happy.
- byte[] h = new byte[header.length];
- System.arraycopy(header, 0, h, 0, header.length);
- this.header = h;
}
+ /**
+ * Get the simple name of the class type to be marshalled
+ * @return the name of the class being marshalled
+ */
public String getName() {
return classType.getSimpleName();
}
@@ -183,7 +182,7 @@ public class JsonSerDeser<T> {
if (count != len) {
throw new EOFException(path.toString() + ": read finished prematurely");
}
- return fromBytes(path.toString(), b, 0);
+ return fromBytes(path.toString(), b);
}
/**
@@ -206,8 +205,7 @@ public class JsonSerDeser<T> {
* @throws IOException on any failure
*/
private void writeJsonAsBytes(T instance,
- DataOutputStream dataOutputStream) throws
- IOException {
+ DataOutputStream dataOutputStream) throws IOException {
try {
byte[] b = toBytes(instance);
dataOutputStream.write(b);
@@ -228,36 +226,50 @@ public class JsonSerDeser<T> {
}
/**
- * Convert JSON To bytes, inserting the header
- * @param instance instance to convert
- * @return a byte array
- * @throws IOException
+ * Deserialize from a byte array
+ * @param path path the data came from
+ * @param bytes byte array
+ * @throws IOException all problems
+ * @throws EOFException not enough data
+ * @throws InvalidRecordException if the parsing failed -the record is invalid
*/
- public byte[] toByteswithHeader(T instance) throws IOException {
- byte[] body = toBytes(instance);
-
- ByteBuffer buffer = ByteBuffer.allocate(body.length + header.length);
- buffer.put(header);
- buffer.put(body);
- return buffer.array();
+ public T fromBytes(String path, byte[] bytes) throws IOException,
+ InvalidRecordException {
+ return fromBytes(path, bytes, "");
}
/**
- * Deserialize from a byte array
+ * Deserialize from a byte array, optionally checking for a marker string.
+ * <p>
+ * If the marker parameter is supplied (and not empty), then its presence
+ * will be verified before the JSON parsing takes place; it is a fast-fail
+ * check. If not found, an {@link InvalidRecordException} exception will be
+ * raised
* @param path path the data came from
* @param bytes byte array
- * @return offset in the array to read from
+ * @param marker an optional string which, if set, MUST be present in the
+ * UTF-8 parsed payload.
+ * @return The parsed record
* @throws IOException all problems
* @throws EOFException not enough data
- * @throws InvalidRecordException if the parsing failed -the record is invalid
+ * @throws InvalidRecordException if the JSON parsing failed.
+ * @throws NoRecordException if the data is not considered a record: either
+ * it is too short or it did not contain the marker string.
*/
- public T fromBytes(String path, byte[] bytes, int offset) throws IOException,
- InvalidRecordException {
- int data = bytes.length - offset;
- if (data <= 0) {
- throw new EOFException("No data at " + path);
+ public T fromBytes(String path, byte[] bytes, String marker)
+ throws IOException, NoRecordException, InvalidRecordException {
+ int len = bytes.length;
+ if (len == 0 ) {
+ throw new NoRecordException(path, E_NO_DATA);
+ }
+ if (StringUtils.isNotEmpty(marker) && len < marker.length()) {
+ throw new NoRecordException(path, E_DATA_TOO_SHORT);
+ }
+ String json = new String(bytes, 0, len, UTF_8);
+ if (StringUtils.isNotEmpty(marker)
+ && !json.contains(marker)) {
+ throw new NoRecordException(path, E_MISSING_MARKER_STRING + marker);
}
- String json = new String(bytes, offset, data, UTF_8);
try {
return fromJson(json);
} catch (JsonProcessingException e) {
@@ -266,52 +278,7 @@ public class JsonSerDeser<T> {
}
/**
- * Read from a byte array to a type, checking the header first
- * @param path source of data
- * @param buffer buffer
- * @return the parsed structure
- * Null if the record was too short or the header did not match
- * @throws IOException on a failure
- * @throws NoRecordException if header checks implied there was no record
- * @throws InvalidRecordException if record parsing failed
- */
- @SuppressWarnings("unchecked")
- public T fromBytesWithHeader(String path, byte[] buffer) throws IOException {
- int hlen = header.length;
- int blen = buffer.length;
- if (hlen > 0) {
- if (blen < hlen) {
- throw new NoRecordException(path, E_NO_SERVICE_RECORD);
- }
- byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
- if (!Arrays.equals(header, magic)) {
- LOG.debug("start of entry does not match service record header at {}",
- path);
- throw new NoRecordException(path, E_NO_SERVICE_RECORD);
- }
- }
- return fromBytes(path, buffer, hlen);
- }
-
- /**
- * Check if a buffer has a header which matches this record type
- * @param buffer buffer
- * @return true if there is a match
- * @throws IOException
- */
- public boolean headerMatches(byte[] buffer) throws IOException {
- int hlen = header.length;
- int blen = buffer.length;
- boolean matches = false;
- if (blen > hlen) {
- byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
- matches = Arrays.equals(header, magic);
- }
- return matches;
- }
-
- /**
- * Convert an object to a JSON string
+ * Convert an instance to a JSON string
* @param instance instance to convert
* @return a JSON string description
* @throws JsonParseException parse problems
@@ -324,4 +291,19 @@ public class JsonSerDeser<T> {
return mapper.writeValueAsString(instance);
}
+ /**
+ * Convert an instance to a string form for output. This is a robust
+ * operation which will convert any JSON-generating exceptions into
+ * error text.
+ * @param instance non-null instance
+ * @return a JSON string
+ */
+ public String toString(T instance) {
+ Preconditions.checkArgument(instance != null, "Null instance argument");
+ try {
+ return toJson(instance);
+ } catch (IOException e) {
+ return "Failed to convert to a string: " + e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
index b4254a3..ec59d59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
@@ -22,17 +22,19 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
-import org.apache.hadoop.registry.client.types.AddressTypes;
+import static org.apache.hadoop.registry.client.types.AddressTypes.*;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Static methods to work with registry types —primarily endpoints and the
@@ -94,79 +96,66 @@ public class RegistryTypeUtils {
Preconditions.checkArgument(protocolType != null, "null protocolType");
Preconditions.checkArgument(hostname != null, "null hostname");
return new Endpoint(api,
- AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
+ ADDRESS_HOSTNAME_AND_PORT,
protocolType,
- tuplelist(hostname, Integer.toString(port)));
+ hostnamePortPair(hostname, port));
}
/**
* Create an IPC endpoint
* @param api API
- * @param protobuf flag to indicate whether or not the IPC uses protocol
- * buffers
* @param address the address as a tuple of (hostname, port)
* @return the new endpoint
*/
- public static Endpoint ipcEndpoint(String api,
- boolean protobuf, List<String> address) {
- ArrayList<List<String>> addressList = new ArrayList<List<String>>();
- if (address != null) {
- addressList.add(address);
- }
+ public static Endpoint ipcEndpoint(String api, InetSocketAddress address) {
return new Endpoint(api,
- AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
- protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF
- : ProtocolTypes.PROTOCOL_HADOOP_IPC,
- addressList);
+ ADDRESS_HOSTNAME_AND_PORT,
+ ProtocolTypes.PROTOCOL_HADOOP_IPC,
+ address== null ? null: hostnamePortPair(address));
}
/**
- * Create a single-element list of tuples from the input.
- * that is, an input ("a","b","c") is converted into a list
- * in the form [["a","b","c"]]
- * @param t1 tuple elements
- * @return a list containing a single tuple
+ * Create a single entry map
+ * @param key map entry key
+ * @param val map entry value
+ * @return a 1 entry map.
*/
- public static List<List<String>> tuplelist(String... t1) {
- List<List<String>> outer = new ArrayList<List<String>>();
- outer.add(tuple(t1));
- return outer;
+ public static Map<String, String> map(String key, String val) {
+ Map<String, String> map = new HashMap<String, String>(1);
+ map.put(key, val);
+ return map;
}
/**
- * Create a tuples from the input.
- * that is, an input ("a","b","c") is converted into a list
- * in the form ["a","b","c"]
- * @param t1 tuple elements
- * @return a single tuple as a list
+ * Create a URI
+ * @param uri value
+ * @return a 1 entry map.
*/
- public static List<String> tuple(String... t1) {
- return Arrays.asList(t1);
+ public static Map<String, String> uri(String uri) {
+ return map(ADDRESS_URI, uri);
}
/**
- * Create a tuples from the input, converting all to Strings in the process
- * that is, an input ("a", 7, true) is converted into a list
- * in the form ["a","7,"true"]
- * @param t1 tuple elements
- * @return a single tuple as a list
+ * Create a (hostname, port) address pair
+ * @param hostname hostname
+ * @param port port
+ * @return a 1 entry map.
*/
- public static List<String> tuple(Object... t1) {
- List<String> l = new ArrayList<String>(t1.length);
- for (Object t : t1) {
- l.add(t.toString());
- }
- return l;
+ public static Map<String, String> hostnamePortPair(String hostname, int port) {
+ Map<String, String> map =
+ map(ADDRESS_HOSTNAME_FIELD, hostname);
+ map.put(ADDRESS_PORT_FIELD, Integer.toString(port));
+ return map;
}
/**
- * Convert a socket address pair into a string tuple, (host, port).
- * TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups.
- * @param address an address
- * @return an element for the address list
+ * Create a (hostname, port) address pair
+ * @param address socket address whose hostname and port are used for the
+ * generated address.
+ * @return a 1 entry map.
*/
- public static List<String> marshall(InetSocketAddress address) {
- return tuple(address.getHostName(), address.getPort());
+ public static Map<String, String> hostnamePortPair(InetSocketAddress address) {
+ return hostnamePortPair(address.getHostName(), address.getPort());
}
/**
@@ -199,25 +188,37 @@ public class RegistryTypeUtils {
if (epr == null) {
return null;
}
- requireAddressType(AddressTypes.ADDRESS_URI, epr);
- List<List<String>> addresses = epr.addresses;
+ requireAddressType(ADDRESS_URI, epr);
+ List<Map<String, String>> addresses = epr.addresses;
if (addresses.size() < 1) {
throw new InvalidRecordException(epr.toString(),
"No addresses in endpoint");
}
List<String> results = new ArrayList<String>(addresses.size());
- for (List<String> address : addresses) {
- if (address.size() != 1) {
- throw new InvalidRecordException(epr.toString(),
- "Address payload invalid: wrong element count: " +
- address.size());
- }
- results.add(address.get(0));
+ for (Map<String, String> address : addresses) {
+ results.add(getAddressField(address, ADDRESS_URI));
}
return results;
}
/**
+ * Get a specific field from an address -raising an exception if
+ * the field is not present
+ * @param address address to query
+ * @param field field to resolve
+ * @return the resolved value. Guaranteed to be non-null.
+ * @throws InvalidRecordException if the field did not resolve
+ */
+ public static String getAddressField(Map<String, String> address,
+ String field) throws InvalidRecordException {
+ String val = address.get(field);
+ if (val == null) {
+ throw new InvalidRecordException("", "Missing address field: " + field);
+ }
+ return val;
+ }
+
+ /**
* Get the address URLs. Guranteed to return at least one address.
* @param epr endpoint
* @return the address as a URL
@@ -237,4 +238,53 @@ public class RegistryTypeUtils {
}
return results;
}
+
+ /**
+ * Validate the record by checking for null fields and other invalid
+ * conditions
+ * @param path path for exceptions
+ * @param record record to validate. May be null
+ * @throws InvalidRecordException on invalid entries
+ */
+ public static void validateServiceRecord(String path, ServiceRecord record)
+ throws InvalidRecordException {
+ if (record == null) {
+ throw new InvalidRecordException(path, "Null record");
+ }
+ if (!ServiceRecord.RECORD_TYPE.equals(record.type)) {
+ throw new InvalidRecordException(path,
+ "invalid record type field: \"" + record.type + "\"");
+ }
+
+ if (record.external != null) {
+ for (Endpoint endpoint : record.external) {
+ validateEndpoint(path, endpoint);
+ }
+ }
+ if (record.internal != null) {
+ for (Endpoint endpoint : record.internal) {
+ validateEndpoint(path, endpoint);
+ }
+ }
+ }
+
+ /**
+ * Validate the endpoint by checking for null fields and other invalid
+ * conditions
+ * @param path path for exceptions
+ * @param endpoint endpoint to validate. May be null
+ * @throws InvalidRecordException on invalid entries
+ */
+ public static void validateEndpoint(String path, Endpoint endpoint)
+ throws InvalidRecordException {
+ if (endpoint == null) {
+ throw new InvalidRecordException(path, "Null endpoint");
+ }
+ try {
+ endpoint.validate();
+ } catch (RuntimeException e) {
+ throw new InvalidRecordException(path, e.toString());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
index 8caf400..68dc84e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -314,7 +313,7 @@ public class RegistryUtils {
Collection<RegistryPathStatus> stats) throws IOException {
Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
for (RegistryPathStatus stat : stats) {
- if (stat.size > ServiceRecordHeader.getLength()) {
+ if (stat.size > ServiceRecord.RECORD_TYPE.length()) {
// maybe has data
String path = join(parentpath, stat.path);
try {
@@ -344,7 +343,6 @@ public class RegistryUtils {
* <p>
* @param operations operation support for fetches
* @param parentpath path of the parent of all the entries
- * @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored.
*/
@@ -362,7 +360,6 @@ public class RegistryUtils {
* <p>
* @param operations operation support for fetches
* @param parentpath path of the parent of all the entries
- * @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored.
*/
@@ -382,7 +379,7 @@ public class RegistryUtils {
*/
public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
public ServiceRecordMarshal() {
- super(ServiceRecord.class, ServiceRecordHeader.getData());
+ super(ServiceRecord.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
index 160433f..b81b9d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
@@ -21,17 +21,11 @@ package org.apache.hadoop.registry.client.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
/**
* Raised if there is no {@link ServiceRecord} resolved at the end
- * of the specified path, for reasons such as:
- * <ul>
- * <li>There wasn't enough data to contain a Service Record.</li>
- * <li>The start of the data did not match the {@link ServiceRecordHeader}
- * header.</li>
- * </ul>
- *
+ * of the specified path.
+ * <p>
* There may be valid data of some form at the end of the path, but it does
* not appear to be a Service Record.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
index 7c01bdf..271ab25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.zookeeper.CreateMode;
@@ -103,10 +105,12 @@ public class RegistryOperationsService extends CuratorService
int flags) throws IOException {
Preconditions.checkArgument(record != null, "null record");
validatePath(path);
+ // validate the record before putting it
+ RegistryTypeUtils.validateServiceRecord(path, record);
LOG.info("Bound at {} : {}", path, record);
CreateMode mode = CreateMode.PERSISTENT;
- byte[] bytes = serviceRecordMarshal.toByteswithHeader(record);
+ byte[] bytes = serviceRecordMarshal.toBytes(record);
zkSet(path, mode, bytes, getClientAcls(),
((flags & BindFlags.OVERWRITE) != 0));
}
@@ -114,7 +118,11 @@ public class RegistryOperationsService extends CuratorService
@Override
public ServiceRecord resolve(String path) throws IOException {
byte[] bytes = zkRead(path);
- return serviceRecordMarshal.fromBytesWithHeader(path, bytes);
+
+ ServiceRecord record = serviceRecordMarshal.fromBytes(path,
+ bytes, ServiceRecord.RECORD_TYPE);
+ RegistryTypeUtils.validateServiceRecord(path, record);
+ return record;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
index 192819c..36dbf0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
@@ -38,6 +38,8 @@ public interface AddressTypes {
* </pre>
*/
public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
+ public static final String ADDRESS_HOSTNAME_FIELD = "host";
+ public static final String ADDRESS_PORT_FIELD = "port";
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
index 51418d9..e4effb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
@@ -21,14 +21,16 @@ package org.apache.hadoop.registry.client.types;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.binding.JsonSerDeser;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Description of a single service/component endpoint.
@@ -67,7 +69,7 @@ public final class Endpoint implements Cloneable {
/**
* a list of address tuples —tuples whose format depends on the address type
*/
- public List<List<String>> addresses;
+ public List<Map<String, String>> addresses;
/**
* Create an empty instance.
@@ -84,10 +86,11 @@ public final class Endpoint implements Cloneable {
this.api = that.api;
this.addressType = that.addressType;
this.protocolType = that.protocolType;
- this.addresses = new ArrayList<List<String>>(that.addresses.size());
- for (List<String> address : addresses) {
- List<String> addr2 = new ArrayList<String>(address.size());
- Collections.copy(address, addr2);
+ this.addresses = newAddresses(that.addresses.size());
+ for (Map<String, String> address : that.addresses) {
+ Map<String, String> addr2 = new HashMap<String, String>(address.size());
+ addr2.putAll(address);
+ addresses.add(addr2);
}
}
@@ -101,17 +104,83 @@ public final class Endpoint implements Cloneable {
public Endpoint(String api,
String addressType,
String protocolType,
- List<List<String>> addrs) {
+ List<Map<String, String>> addrs) {
this.api = api;
this.addressType = addressType;
this.protocolType = protocolType;
- this.addresses = new ArrayList<List<String>>();
+ this.addresses = newAddresses(0);
if (addrs != null) {
addresses.addAll(addrs);
}
}
/**
+ * Build an endpoint with an empty address list
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType) {
+ this.api = api;
+ this.addressType = addressType;
+ this.protocolType = protocolType;
+ this.addresses = newAddresses(0);
+ }
+
+ /**
+ * Build an endpoint with a single address entry.
+ * <p>
+ * This constructor is superfluous given the varags constructor is equivalent
+ * for a single element argument. However, type-erasure in java generics
+ * causes javac to warn about unchecked generic array creation. This
+ * constructor, which represents the common "one address" case, does
+ * not generate compile-time warnings.
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addr address. May be null —in which case it is not added
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ Map<String, String> addr) {
+ this(api, addressType, protocolType);
+ if (addr != null) {
+ addresses.add(addr);
+ }
+ }
+
+ /**
+ * Build an endpoint with a list of addresses
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addrs addresses. Null elements will be skipped
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ Map<String, String>...addrs) {
+ this(api, addressType, protocolType);
+ for (Map<String, String> addr : addrs) {
+ if (addr!=null) {
+ addresses.add(addr);
+ }
+ }
+ }
+
+ /**
+ * Create a new address structure of the requested size
+ * @param size size to create
+ * @return the new list
+ */
+ private List<Map<String, String>> newAddresses(int size) {
+ return new ArrayList<Map<String, String>>(size);
+ }
+
+ /**
* Build an endpoint from a list of URIs; each URI
* is ASCII-encoded and added to the list of addresses.
* @param api API name
@@ -125,40 +194,16 @@ public final class Endpoint implements Cloneable {
this.addressType = AddressTypes.ADDRESS_URI;
this.protocolType = protocolType;
- List<List<String>> addrs = new ArrayList<List<String>>(uris.length);
+ List<Map<String, String>> addrs = newAddresses(uris.length);
for (URI uri : uris) {
- addrs.add(RegistryTypeUtils.tuple(uri.toString()));
+ addrs.add(RegistryTypeUtils.uri(uri.toString()));
}
this.addresses = addrs;
}
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("Endpoint{");
- sb.append("api='").append(api).append('\'');
- sb.append(", addressType='").append(addressType).append('\'');
- sb.append(", protocolType='").append(protocolType).append('\'');
-
- sb.append(", addresses=");
- if (addresses != null) {
- sb.append("[ ");
- for (List<String> address : addresses) {
- sb.append("[ ");
- if (address == null) {
- sb.append("NULL entry in address list");
- } else {
- for (String elt : address) {
- sb.append('"').append(elt).append("\" ");
- }
- }
- sb.append("] ");
- };
- sb.append("] ");
- } else {
- sb.append("(null) ");
- }
- sb.append('}');
- return sb.toString();
+ return marshalToString.toString(this);
}
/**
@@ -173,7 +218,7 @@ public final class Endpoint implements Cloneable {
Preconditions.checkNotNull(addressType, "null addressType field");
Preconditions.checkNotNull(protocolType, "null protocolType field");
Preconditions.checkNotNull(addresses, "null addresses field");
- for (List<String> address : addresses) {
+ for (Map<String, String> address : addresses) {
Preconditions.checkNotNull(address, "null element in address");
}
}
@@ -184,7 +229,19 @@ public final class Endpoint implements Cloneable {
* @throws CloneNotSupportedException
*/
@Override
- protected Object clone() throws CloneNotSupportedException {
+ public Object clone() throws CloneNotSupportedException {
return super.clone();
}
+
+
+ /**
+ * Static instance of service record marshalling
+ */
+ private static class Marshal extends JsonSerDeser<Endpoint> {
+ private Marshal() {
+ super(Endpoint.class);
+ }
+ }
+
+ private static final Marshal marshalToString = new Marshal();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
index f225cf0..b836b00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
@@ -34,16 +34,11 @@ public interface ProtocolTypes {
String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
/**
- * Classic Hadoop IPC : {@value}.
+ * Hadoop IPC, "classic" or protobuf : {@value}.
*/
String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
/**
- * Hadoop protocol buffers IPC: {@value}.
- */
- String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
-
- /**
* Corba IIOP: {@value}.
*/
String PROTOCOL_IIOP = "IIOP";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
index 378127f..9403d31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.registry.client.types;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.codehaus.jackson.annotate.JsonAnyGetter;
import org.codehaus.jackson.annotate.JsonAnySetter;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -41,6 +42,17 @@ import java.util.Map;
public class ServiceRecord implements Cloneable {
/**
+ * A type string which MUST be in the serialized json. This permits
+ * fast discarding of invalid entries
+ */
+ public static final String RECORD_TYPE = "JSONServiceRecord";
+
+ /**
+ * The type field. This must be the string {@link #RECORD_TYPE}
+ */
+ public String type = RECORD_TYPE;
+
+ /**
* Description string
*/
public String description;
@@ -233,17 +245,5 @@ public class ServiceRecord implements Cloneable {
return super.clone();
}
- /**
- * Validate the record by checking for null fields and other invalid
- * conditions
- * @throws NullPointerException if a field is null when it
- * MUST be set.
- * @throws RuntimeException on invalid entries
- */
- public void validate() {
- for (Endpoint endpoint : external) {
- Preconditions.checkNotNull("null endpoint", endpoint);
- endpoint.validate();
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
deleted file mode 100644
index 2f75dba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.registry.client.types;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Service record header; access to the byte array kept private
- * to avoid findbugs warnings of mutability
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class ServiceRecordHeader {
- /**
- * Header of a service record: "jsonservicerec"
- * By making this over 12 bytes long, we can auto-determine which entries
- * in a listing are too short to contain a record without getting their data
- */
- private static final byte[] RECORD_HEADER = {
- 'j', 's', 'o', 'n',
- 's', 'e', 'r', 'v', 'i', 'c', 'e',
- 'r', 'e', 'c'
- };
-
- /**
- * Get the length of the record header
- * @return the header length
- */
- public static int getLength() {
- return RECORD_HEADER.length;
- }
-
- /**
- * Get a clone of the record header
- * @return the new record header.
- */
- public static byte[] getData() {
- byte[] h = new byte[RECORD_HEADER.length];
- System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
- return h;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
index 1c19ade..a950475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
@@ -4,6 +4,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
(*
+============================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +20,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+============================================================================
*)
(*
@@ -71,13 +73,22 @@ CONSTANTS
MknodeActions \* all possible mkdir actions
+ASSUME PathChars \in STRING
+ASSUME Paths \in STRING
+
+(* Data in records is JSON, hence a string *)
+ASSUME Data \in STRING
+
+----------------------------------------------------------------------------------------
(* the registry*)
VARIABLE registry
+
(* Sequence of actions to apply to the registry *)
VARIABLE actions
+
----------------------------------------------------------------------------------------
(* Tuple of all variables. *)
@@ -92,7 +103,6 @@ vars == << registry, actions >>
(* Persistence policy *)
PersistPolicySet == {
- "", \* Undefined; field not present. PERMANENT is implied.
"permanent", \* persists until explicitly removed
"application", \* persists until the application finishes
"application-attempt", \* persists until the application attempt finishes
@@ -104,7 +114,6 @@ TypeInvariant ==
/\ \A p \in PersistPolicies: p \in PersistPolicySet
-
----------------------------------------------------------------------------------------
@@ -129,6 +138,14 @@ RegistryEntry == [
]
+(* Define the set of all string to string mappings *)
+
+StringMap == [
+ STRING |-> STRING
+]
+
+
+
(*
An endpoint in a service record
*)
@@ -140,21 +157,14 @@ Endpoint == [
addresses: Addresses
]
-(* Attributes are the set of all string to string mappings *)
-
-Attributes == [
-STRING |-> STRING
-]
-
(*
A service record
*)
ServiceRecord == [
- \* ID -used when applying the persistence policy
- yarn_id: STRING,
- \* the persistence policy
- yarn_persistence: PersistPolicySet,
+ \* This MUST be present: if it is not then the data is not a service record
+ \* This permits shortcut scan & reject of byte arrays without parsing
+ type: "JSONServiceRecord",
\*A description
description: STRING,
@@ -166,9 +176,34 @@ ServiceRecord == [
internal: Endpoints,
\* Attributes are a function
- attributes: Attributes
+ attributes: StringMap
]
+----------------------------------------------------------------------------------------
+
+(*
+ There is an operation serialize whose internals are not defined,
+ Which converts the service records to JSON
+ *)
+
+CONSTANT serialize(_)
+
+(* A function which returns true iff the byte stream is considered a valid service record. *)
+CONSTANT containsServiceRecord(_)
+
+(* A function to deserialize a string to JSON *)
+CONSTANT deserialize(_)
+
+ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN
+
+(* Records can be serialized *)
+ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r))
+
+(* All strings for which containsServiceRecord() holds can be deserialized *)
+ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord
+
+
+
----------------------------------------------------------------------------------------
@@ -304,8 +339,8 @@ validRegistry(R) ==
\* an entry must be the root entry or have a parent entry
/\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
- \* If the entry has data, it must be a service record
- /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords)
+ \* If the entry has data, it must contain a service record
+ /\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data))
----------------------------------------------------------------------------------------
@@ -336,13 +371,13 @@ mknode() adds a new empty entry where there was none before, iff
*)
mknodeSimple(R, path) ==
- LET record == [ path |-> path, data |-> <<>> ]
+ LET entry == [ path |-> path, data |-> <<>> ]
IN \/ exists(R, path)
- \/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} ))
+ \/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} ))
(*
-For all parents, the mknodeSimpl() criteria must apply.
+For all parents, the mknodeSimple() criteria must apply.
This could be defined recursively, though as TLA+ does not support recursion,
an alternative is required
@@ -350,7 +385,8 @@ an alternative is required
Because this specification is declaring the final state of a operation, not
the implemental, all that is needed is to describe those parents.
-It declares that the mkdirSimple state applies to the path and all its parents in the set R'
+It declares that the mknodeSimple() state applies to the path and all
+its parents in the set R'
*)
mknodeWithParents(R, path) ==
@@ -402,7 +438,7 @@ purge(R, path, id, persistence) ==
=> recursiveDelete(R, p2.path)
(*
-resolveRecord() resolves the record at a path or fails.
+resolveEntry() resolves the record entry at a path or fails.
It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
is guaranteed to return the single entry of that set, iff the choice predicate holds.
@@ -411,19 +447,28 @@ Using a predicate of TRUE, it always succeeds, so this function selects
the sole entry of the resolve operation.
*)
-resolveRecord(R, path) ==
+resolveEntry(R, path) ==
LET l == resolve(R, path) IN
/\ Cardinality(l) = 1
/\ CHOOSE e \in l : TRUE
(*
+ Resolve a record by resolving the entry and deserializing the result
+ *)
+resolveRecord(R, path) ==
+ deserialize(resolveEntry(R, path))
+
+
+(*
The specific action of putting an entry into a record includes validating the record
*)
validRecordToBind(path, record) ==
\* The root entry must have permanent persistence
- isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent"
- \/ record.attributes["yarn:persistence"] = "")
+ isRootPath(path) => (
+ record.attributes["yarn:persistence"] = "permanent"
+ \/ record.attributes["yarn:persistence"]
+ \/ record.attributes["yarn:persistence"] = {})
(*
@@ -432,13 +477,12 @@ marshalled as the data in the entry
*)
bindRecord(R, path, record) ==
/\ validRecordToBind(path, record)
- /\ bind(R, [path |-> path, data |-> record])
+ /\ bind(R, [path |-> path, data |-> serialize(record)])
----------------------------------------------------------------------------------------
-
(*
The action queue can only contain one of the sets of action types, and
by giving each a unique name, those sets are guaranteed to be disjoint
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
index 460ecad..91602e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.registry;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -46,11 +45,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.*;
/**
* This is a set of static methods to aid testing the registry operations.
@@ -61,18 +56,18 @@ public class RegistryTestHelper extends Assert {
public static final String SC_HADOOP = "org-apache-hadoop";
public static final String USER = "devteam/";
public static final String NAME = "hdfs";
- public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs";
- public static final String API_HDFS = "org_apache_hadoop_namenode_dfs";
+ public static final String API_WEBHDFS = "classpath:org.apache.hadoop.namenode.webhdfs";
+ public static final String API_HDFS = "classpath:org.apache.hadoop.namenode.dfs";
public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
public static final String ENTRY_PATH = PARENT_PATH + NAME;
- public static final String NNIPC = "nnipc";
- public static final String IPC2 = "IPC2";
+ public static final String NNIPC = "uuid:423C2B93-C927-4050-AEC6-6540E6646437";
+ public static final String IPC2 = "uuid:0663501D-5AD3-4F7E-9419-52F5D6636FCF";
private static final Logger LOG =
LoggerFactory.getLogger(RegistryTestHelper.class);
- public static final String KTUTIL = "ktutil";
private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
new RegistryUtils.ServiceRecordMarshal();
+ public static final String HTTP_API = "http://";
/**
* Assert the path is valid by ZK rules
@@ -148,9 +143,9 @@ public class RegistryTestHelper extends Assert {
assertEquals(API_WEBHDFS, webhdfs.api);
assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
- List<List<String>> addressList = webhdfs.addresses;
- List<String> url = addressList.get(0);
- String addr = url.get(0);
+ List<Map<String, String>> addressList = webhdfs.addresses;
+ Map<String, String> url = addressList.get(0);
+ String addr = url.get("uri");
assertTrue(addr.contains("http"));
assertTrue(addr.contains(":8020"));
@@ -159,8 +154,9 @@ public class RegistryTestHelper extends Assert {
nnipc.protocolType);
Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
+ assertNotNull(ipc2);
- Endpoint web = findEndpoint(record, "web", true, 1, 1);
+ Endpoint web = findEndpoint(record, HTTP_API, true, 1, 1);
assertEquals(1, web.addresses.size());
assertEquals(1, web.addresses.get(0).size());
}
@@ -275,14 +271,14 @@ public class RegistryTestHelper extends Assert {
public static void addSampleEndpoints(ServiceRecord entry, String hostname)
throws URISyntaxException {
assertNotNull(hostname);
- entry.addExternalEndpoint(webEndpoint("web",
+ entry.addExternalEndpoint(webEndpoint(HTTP_API,
new URI("http", hostname + ":80", "/")));
entry.addExternalEndpoint(
restEndpoint(API_WEBHDFS,
new URI("http", hostname + ":8020", "/")));
- Endpoint endpoint = ipcEndpoint(API_HDFS, true, null);
- endpoint.addresses.add(tuple(hostname, "8030"));
+ Endpoint endpoint = ipcEndpoint(API_HDFS, null);
+ endpoint.addresses.add(RegistryTypeUtils.hostnamePortPair(hostname, 8030));
entry.addInternalEndpoint(endpoint);
InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
entry.addInternalEndpoint(
@@ -290,9 +286,7 @@ public class RegistryTestHelper extends Assert {
8050));
entry.addInternalEndpoint(
RegistryTypeUtils.ipcEndpoint(
- IPC2,
- true,
- RegistryTypeUtils.marshall(localhost)));
+ IPC2, localhost));
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
index 14e3b1f..f1814d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
@@ -19,9 +19,9 @@
package org.apache.hadoop.registry.client.binding;
import org.apache.hadoop.registry.RegistryTestHelper;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -31,8 +31,6 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.EOFException;
-
/**
* Test record marshalling
*/
@@ -44,6 +42,7 @@ public class TestMarshalling extends RegistryTestHelper {
public final Timeout testTimeout = new Timeout(10000);
@Rule
public TestName methodName = new TestName();
+
private static RegistryUtils.ServiceRecordMarshal marshal;
@BeforeClass
@@ -55,42 +54,65 @@ public class TestMarshalling extends RegistryTestHelper {
public void testRoundTrip() throws Throwable {
String persistence = PersistencePolicies.PERMANENT;
ServiceRecord record = createRecord(persistence);
- record.set("customkey","customvalue");
- record.set("customkey2","customvalue2");
+ record.set("customkey", "customvalue");
+ record.set("customkey2", "customvalue2");
+ RegistryTypeUtils.validateServiceRecord("", record);
LOG.info(marshal.toJson(record));
byte[] bytes = marshal.toBytes(record);
- ServiceRecord r2 = marshal.fromBytes("", bytes, 0);
+ ServiceRecord r2 = marshal.fromBytes("", bytes);
assertMatches(record, r2);
+ RegistryTypeUtils.validateServiceRecord("", r2);
}
- @Test
- public void testRoundTripHeaders() throws Throwable {
- ServiceRecord record = createRecord(PersistencePolicies.CONTAINER);
- byte[] bytes = marshal.toByteswithHeader(record);
- ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
- assertMatches(record, r2);
+ @Test(expected = NoRecordException.class)
+ public void testUnmarshallNoData() throws Throwable {
+ marshal.fromBytes("src", new byte[]{});
}
@Test(expected = NoRecordException.class)
- public void testRoundTripBadHeaders() throws Throwable {
- ServiceRecord record = createRecord(PersistencePolicies.APPLICATION);
- byte[] bytes = marshal.toByteswithHeader(record);
- bytes[1] = 0x01;
- marshal.fromBytesWithHeader("src", bytes);
+ public void testUnmarshallNotEnoughData() throws Throwable {
+ // this is nominally JSON -but without the service record header
+ marshal.fromBytes("src", new byte[]{'{','}'}, ServiceRecord.RECORD_TYPE);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnmarshallNoBody() throws Throwable {
+ byte[] bytes = "this is not valid JSON at all and should fail".getBytes();
+ marshal.fromBytes("src", bytes);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnmarshallWrongType() throws Throwable {
+ byte[] bytes = "{'type':''}".getBytes();
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling", bytes);
+ RegistryTypeUtils.validateServiceRecord("validating", serviceRecord);
}
@Test(expected = NoRecordException.class)
- public void testUnmarshallHeaderTooShort() throws Throwable {
- marshal.fromBytesWithHeader("src", new byte[]{'a'});
+ public void testUnmarshallWrongLongType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "ThisRecordHasALongButNonMatchingType";
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
+ bytes, ServiceRecord.RECORD_TYPE);
}
- @Test(expected = EOFException.class)
- public void testUnmarshallNoBody() throws Throwable {
- byte[] bytes = ServiceRecordHeader.getData();
- marshal.fromBytesWithHeader("src", bytes);
+ @Test(expected = NoRecordException.class)
+ public void testUnmarshallNoType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "NoRecord";
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
+ bytes, ServiceRecord.RECORD_TYPE);
}
+ @Test(expected = InvalidRecordException.class)
+ public void testRecordValidationWrongType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "NotAServiceRecordType";
+ RegistryTypeUtils.validateServiceRecord("validating", record);
+ }
@Test
public void testUnknownFieldsRoundTrip() throws Throwable {
@@ -102,8 +124,8 @@ public class TestMarshalling extends RegistryTestHelper {
assertEquals("2", record.get("intval"));
assertNull(record.get("null"));
assertEquals("defval", record.get("null", "defval"));
- byte[] bytes = marshal.toByteswithHeader(record);
- ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord r2 = marshal.fromBytes("", bytes);
assertEquals("value", r2.get("key"));
assertEquals("2", r2.get("intval"));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
index 7a7f88c..853d7f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.AbstractRegistryTest;
import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
@@ -91,10 +92,8 @@ public class TestRegistryOperations extends AbstractRegistryTest {
childStats.values());
assertEquals(1, records.size());
ServiceRecord record = records.get(ENTRY_PATH);
- assertNotNull(record);
- record.validate();
+ RegistryTypeUtils.validateServiceRecord(ENTRY_PATH, record);
assertMatches(written, record);
-
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5924e74d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
index a2a5009..b38d9fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
@@ -353,6 +353,10 @@ application.
<td>Description</td>
</tr>
<tr>
+ <td>type: String</td>
+ <td>Always: "JSONServiceRecord"</td>
+ </tr>
+ <tr>
<td>description: String</td>
<td>Human-readable description.</td>
</tr>
@@ -366,6 +370,8 @@ application.
</tr>
</table>
+The type field MUST be `"JSONServiceRecord"`. Mandating this string allows future record types *and* permits rapid rejection of byte arrays that lack this string before attempting JSON parsing.
+
### YARN Persistence policies
The YARN Resource Manager integration integrates cleanup of service records
@@ -379,7 +385,6 @@ any use of the registry without the RM's participation.
The attributes, `yarn:id` and `yarn:persistence` specify which records
*and any child entries* may be deleted as the associated YARN components complete.
-
The `yarn:id` field defines the application, attempt or container ID to match;
the `yarn:persistence` attribute defines the trigger for record cleanup, and
implicitly the type of the contents of the `yarn:id` field.
@@ -432,31 +437,32 @@ up according the lifecycle of that application.
<td>Description</td>
</tr>
<tr>
- <td>addresses: List[List[String]]</td>
- <td>a list of address tuples whose format depends on the address type</td>
- </tr>
- <tr>
- <td>addressType: String</td>
- <td>format of the binding</td>
- </tr>
+ <td>api: URI as String</td>
+ <td>API implemented at the end of the binding</td>
<tr>
<td>protocol: String</td>
<td>Protocol. Examples:
`http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ...</td>
</tr>
<tr>
- <td>api: String</td>
- <td>API implemented at the end of the binding</td>
+ <td>addressType: String</td>
+ <td>format of the binding</td>
</tr>
+ </tr>
+ <tr>
+ <td>addresses: List[Map[String, String]]</td>
+ <td>a list of address maps</td>
+ </tr>
+
</table>
All string fields have a limit on size, to dissuade services from hiding
complex JSON structures in the text description.
-### Field: Address Type
+#### Field `addressType`: Address Type
-The addressType field defines the string format of entries.
+The `addressType` field defines the string format of entries.
Having separate types is that tools (such as a web viewer) can process binding
strings without having to recognize the protocol.
@@ -467,43 +473,58 @@ strings without having to recognize the protocol.
<td>binding format</td>
</tr>
<tr>
- <td>`url`</td>
- <td>`[URL]`</td>
+ <td>uri</td>
+ <td>uri:URI of endpoint</td>
</tr>
<tr>
- <td>`hostname`</td>
- <td>`[hostname]`</td>
+ <td>hostname</td>
+ <td>hostname: service host</td>
</tr>
<tr>
- <td>`inetaddress`</td>
- <td>`[hostname, port]`</td>
+ <td>inetaddress</td>
+ <td>hostname: service host, port: service port</td>
</tr>
<tr>
- <td>`path`</td>
- <td>`[/path/to/something]`</td>
+ <td>path</td>
+ <td>path: generic unix filesystem path</td>
</tr>
<tr>
- <td>`zookeeper`</td>
- <td>`[quorum-entry, path]`</td>
+ <td>zookeeper</td>
+ <td>hostname: service host, port: service port, path: ZK path</td>
</tr>
</table>
-An actual zookeeper binding consists of a list of `hostname:port` bindings –the
-quorum— and the path within. In the proposed schema, every quorum entry will be
-listed as a triple of `[hostname, port, path]`. Client applications do not
-expect the path to de be different across the quorum. The first entry in the
-list of quorum hosts MUST define the path to be used by all clients. Later
-entries SHOULD list the same path, though clients MUST ignore these.
+In the zookeeper binding, every entry represents a single node in quorum,
+the `hostname` and `port` fields defining the hostname of the ZK instance
+and the port on which it is listening. The `path` field lists zookeeper path
+for applications to use. For example, for HBase this would refer to the znode
+containing information about the HBase cluster.
+
+The path MUST be identical across all address elements in the `addresses` list.
+This ensures that any single address contains enough information to connect
+to the quorum and connect to the relevant znode.
New Address types may be defined; if not standard please prefix with the
character sequence `"x-"`.
-#### **Field: API**
+### Field `api`: API identifier
+
+The API field MUST contain a URI that identifies the specific API of an endpoint.
+These MUST be unique to an API to avoid confusion.
+
+The following strategies are suggested to provide unique URIs for an API
+
+1. The SOAP/WS-* convention of using the URL to where the WSDL defining the service
+2. A URL to the svn/git hosted document defining a REST API
+3. the `classpath` schema followed by a path to a class or package in an application.
+4. The `uuid` schema with a generated UUID.
+
+It is hoped that standard API URIs will be defined for common APIs. Two such non-normative APIs are used in this document
+
+* `http://` : A web site for humans
+* `classpath:javax.management.jmx`: and endpoint supporting the JMX management protocol (RMI-based)
-APIs may be unique to a service class, or may be common across by service
-classes. They MUST be given unique names. These MAY be based on service
-packages but MAY be derived from other naming schemes:
### Examples of Service Entries
@@ -524,12 +545,14 @@ overall application. It exports the URL to a load balancer.
{
"description" : "tomcat-based web application",
- "registrationTime" : 1408638082444,
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
- "addresses" : [ [ "http://loadbalancer/" ] [ "http://loadbalancer2/" ] ]
+ "protocol" : "REST",
+ "addresses" : [
+ { "uri" : "http://loadbalancer/" },
+ { "uri" : "http://loadbalancer2/" }
+ ]
} ],
"internal" : [ ]
}
@@ -545,21 +568,23 @@ will trigger the deletion of this entry
/users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001
{
- "registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000001",
- "yarn:persistence" : "3",
- "description" : null,
+ "yarn:persistence" : "container",
+ "description" : "",
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
- "addresses" : [ [ "http://rack4server3:43572" ] ]
+ "protocol" : "REST",
+ "addresses" : [{ "uri" : "rack4server3:43572" } ]
} ],
"internal" : [ {
- "api" : "jmx",
+ "api" : "classpath:javax.management.jmx",
"addressType" : "host/port",
- "protocolType" : "JMX",
- "addresses" : [ [ "rack4server3", "43573" ] ]
+ "protocol" : "rmi",
+ "addresses" : [ {
+ "host" : "rack4server3",
+ "port" : "48551"
+ } ]
} ]
}
@@ -571,19 +596,22 @@ external endpoint, the JMX addresses as internal.
{
"registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000002",
- "yarn:persistence" : "3",
+ "yarn:persistence" : "container",
"description" : null,
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
+ "protocol" : "REST",
"addresses" : [ [ "http://rack1server28:35881" ] ]
} ],
"internal" : [ {
- "api" : "jmx",
+ "api" : "classpath:javax.management.jmx",
"addressType" : "host/port",
- "protocolType" : "JMX",
- "addresses" : [ [ "rack1server28", "35882" ] ]
+ "protocol" : "rmi",
+ "addresses" : [ {
+ "host" : "rack1server28",
+ "port" : "48551"
+ } ]
} ]
}
@@ -887,3 +915,106 @@ Implementations may throttle update operations.
**Rate of Polling**
Clients which poll the registry may be throttled.
+
+# Complete service record example
+
+Below is a (non-normative) example of a service record retrieved
+from a YARN application.
+
+
+ {
+ "type" : "JSONServiceRecord",
+ "description" : "Slider Application Master",
+ "yarn:persistence" : "application",
+ "yarn:id" : "application_1414052463672_0028",
+ "external" : [ {
+ "api" : "classpath:org.apache.slider.appmaster",
+ "addressType" : "host/port",
+ "protocol" : "hadoop/IPC",
+ "addresses" : [ {
+ "port" : "48551",
+ "host" : "nn.example.com"
+ } ]
+ }, {
+ "api" : "http://",
+ "addressType" : "uri",
+ "protocol" : "web",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.management",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/mgmt"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.registry",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/registry"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher.configurations",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/slider"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher.exports",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/exports"
+ } ]
+ } ],
+ "internal" : [ {
+ "api" : "classpath:org.apache.slider.agents.secure",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "https://nn.example.com:52705/ws/v1/slider/agents"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.agents.oneway",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "https://nn.example.com:33425/ws/v1/slider/agents"
+ } ]
+ } ]
+ }
+
+It publishes a number of endpoints, both internal and external.
+
+External:
+
+1. The IPC hostname and port for client-AM communications
+1. URL to the AM's web UI
+1. A series of REST URLs under the web UI for specific application services.
+The details are irrelevant —note that they use an application-specific API
+value to ensure uniqueness.
+
+Internal:
+1. Two URLS to REST APIs offered by the AM for containers deployed by
+ the application itself.
+
+Python agents running in the containers retrieve the internal endpoint
+URLs to communicate with their AM. The record is resolved on container startup
+and cached until communications problems occur. At that point the registry is
+queried for the current record, then an attempt is made to reconnect to the AM.
+
+Here "connectivity" problems means both "low level socket/IO errors" and
+"failures in HTTPS authentication". The agents use two-way HTTPS authentication
+—if the AM fails and another application starts listening on the same ports
+it will trigger an authentication failure and hence service record reread.
[3/3] hadoop git commit: YARN-2768 Improved Yarn Registry service
record structure (stevel)
Posted by st...@apache.org.
YARN-2768 Improved Yarn Registry service record structure (stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/16705780
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/16705780
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/16705780
Branch: refs/heads/trunk
Commit: 1670578018b3210d518408530858a869e37b23cb
Parents: f5b19be
Author: Steve Loughran <st...@apache.org>
Authored: Thu Nov 6 20:21:25 2014 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Nov 6 20:22:22 2014 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../apache/hadoop/registry/cli/RegistryCli.java | 25 +-
.../registry/client/binding/JsonSerDeser.java | 142 +++++------
.../client/binding/RegistryTypeUtils.java | 166 ++++++++-----
.../registry/client/binding/RegistryUtils.java | 7 +-
.../client/exceptions/NoRecordException.java | 10 +-
.../impl/zk/RegistryOperationsService.java | 12 +-
.../registry/client/types/AddressTypes.java | 2 +
.../hadoop/registry/client/types/Endpoint.java | 131 ++++++++---
.../registry/client/types/ProtocolTypes.java | 7 +-
.../registry/client/types/ServiceRecord.java | 26 +--
.../client/types/ServiceRecordHeader.java | 59 -----
.../src/main/tla/yarnregistry.tla | 94 ++++++--
.../hadoop/registry/RegistryTestHelper.java | 36 ++-
.../client/binding/TestMarshalling.java | 72 ++++--
.../operations/TestRegistryOperations.java | 5 +-
.../src/site/markdown/registry/yarn-registry.md | 233 +++++++++++++++----
17 files changed, 623 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0fe957c..0c5fc4c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -740,6 +740,8 @@ Release 2.6.0 - UNRELEASED
YARN-2677 registry punycoding of usernames doesn't fix all usernames to be
DNS-valid (stevel)
+ YARN-2768 Improved Yarn Registry service record structure (stevel)
+
---
YARN-2598 GHS should show N/A instead of null for the inaccessible information
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
index 863039e..bf2b5e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
@@ -24,6 +24,7 @@ import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
@@ -174,24 +175,22 @@ public class RegistryCli extends Configured implements Tool {
ServiceRecord record = registry.resolve(argsList.get(1));
for (Endpoint endpoint : record.external) {
- if ((endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_WEBUI))
- || (endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_REST))) {
- sysout.print(" Endpoint(ProtocolType="
- + endpoint.protocolType + ", Api="
- + endpoint.api + "); Uris are: ");
- } else {
- sysout.print(" Endpoint(ProtocolType="
+ sysout.println(" Endpoint(ProtocolType="
+ endpoint.protocolType + ", Api="
+ endpoint.api + ");"
+ " Addresses(AddressType="
+ endpoint.addressType + ") are: ");
- }
- for (List<String> a : endpoint.addresses) {
- sysout.print(a + " ");
- }
- sysout.println();
- }
+ for (Map<String, String> address : endpoint.addresses) {
+ sysout.println(" [ ");
+ for (Map.Entry<String, String> entry : address.entrySet()) {
+ sysout.println(" " + entry.getKey()
+ + ": \"" + entry.getValue() + "\"");
+ }
+ sysout.println(" ]");
+ }
+ sysout.println();
+ }
return 0;
} catch (Exception e) {
syserr.println(analyzeException("resolve", e, argsList));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
index e086e36..af4e4f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.registry.client.binding;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,8 +46,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
/**
* Support for marshalling objects to and from JSON.
@@ -62,30 +61,30 @@ public class JsonSerDeser<T> {
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class);
private static final String UTF_8 = "UTF-8";
- public static final String E_NO_SERVICE_RECORD = "No service record at path";
+ public static final String E_NO_DATA = "No data at path";
+ public static final String E_DATA_TOO_SHORT = "Data at path too short";
+ public static final String E_MISSING_MARKER_STRING =
+ "Missing marker string: ";
private final Class<T> classType;
private final ObjectMapper mapper;
- private final byte[] header;
/**
* Create an instance bound to a specific type
* @param classType class to marshall
- * @param header byte array to use as header
*/
- public JsonSerDeser(Class<T> classType, byte[] header) {
+ public JsonSerDeser(Class<T> classType) {
Preconditions.checkArgument(classType != null, "null classType");
- Preconditions.checkArgument(header != null, "null header");
this.classType = classType;
this.mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- // make an immutable copy to keep findbugs happy.
- byte[] h = new byte[header.length];
- System.arraycopy(header, 0, h, 0, header.length);
- this.header = h;
}
+ /**
+ * Get the simple name of the class type to be marshalled
+ * @return the name of the class being marshalled
+ */
public String getName() {
return classType.getSimpleName();
}
@@ -183,7 +182,7 @@ public class JsonSerDeser<T> {
if (count != len) {
throw new EOFException(path.toString() + ": read finished prematurely");
}
- return fromBytes(path.toString(), b, 0);
+ return fromBytes(path.toString(), b);
}
/**
@@ -206,8 +205,7 @@ public class JsonSerDeser<T> {
* @throws IOException on any failure
*/
private void writeJsonAsBytes(T instance,
- DataOutputStream dataOutputStream) throws
- IOException {
+ DataOutputStream dataOutputStream) throws IOException {
try {
byte[] b = toBytes(instance);
dataOutputStream.write(b);
@@ -228,36 +226,50 @@ public class JsonSerDeser<T> {
}
/**
- * Convert JSON To bytes, inserting the header
- * @param instance instance to convert
- * @return a byte array
- * @throws IOException
+ * Deserialize from a byte array
+ * @param path path the data came from
+ * @param bytes byte array
+ * @throws IOException all problems
+ * @throws EOFException not enough data
+ * @throws InvalidRecordException if the parsing failed -the record is invalid
*/
- public byte[] toByteswithHeader(T instance) throws IOException {
- byte[] body = toBytes(instance);
-
- ByteBuffer buffer = ByteBuffer.allocate(body.length + header.length);
- buffer.put(header);
- buffer.put(body);
- return buffer.array();
+ public T fromBytes(String path, byte[] bytes) throws IOException,
+ InvalidRecordException {
+ return fromBytes(path, bytes, "");
}
/**
- * Deserialize from a byte array
+ * Deserialize from a byte array, optionally checking for a marker string.
+ * <p>
+ * If the marker parameter is supplied (and not empty), then its presence
+ * will be verified before the JSON parsing takes place; it is a fast-fail
+ * check. If not found, an {@link InvalidRecordException} exception will be
+ * raised
* @param path path the data came from
* @param bytes byte array
- * @return offset in the array to read from
+ * @param marker an optional string which, if set, MUST be present in the
+ * UTF-8 parsed payload.
+ * @return The parsed record
* @throws IOException all problems
* @throws EOFException not enough data
- * @throws InvalidRecordException if the parsing failed -the record is invalid
+ * @throws InvalidRecordException if the JSON parsing failed.
+ * @throws NoRecordException if the data is not considered a record: either
+ * it is too short or it did not contain the marker string.
*/
- public T fromBytes(String path, byte[] bytes, int offset) throws IOException,
- InvalidRecordException {
- int data = bytes.length - offset;
- if (data <= 0) {
- throw new EOFException("No data at " + path);
+ public T fromBytes(String path, byte[] bytes, String marker)
+ throws IOException, NoRecordException, InvalidRecordException {
+ int len = bytes.length;
+ if (len == 0 ) {
+ throw new NoRecordException(path, E_NO_DATA);
+ }
+ if (StringUtils.isNotEmpty(marker) && len < marker.length()) {
+ throw new NoRecordException(path, E_DATA_TOO_SHORT);
+ }
+ String json = new String(bytes, 0, len, UTF_8);
+ if (StringUtils.isNotEmpty(marker)
+ && !json.contains(marker)) {
+ throw new NoRecordException(path, E_MISSING_MARKER_STRING + marker);
}
- String json = new String(bytes, offset, data, UTF_8);
try {
return fromJson(json);
} catch (JsonProcessingException e) {
@@ -266,52 +278,7 @@ public class JsonSerDeser<T> {
}
/**
- * Read from a byte array to a type, checking the header first
- * @param path source of data
- * @param buffer buffer
- * @return the parsed structure
- * Null if the record was too short or the header did not match
- * @throws IOException on a failure
- * @throws NoRecordException if header checks implied there was no record
- * @throws InvalidRecordException if record parsing failed
- */
- @SuppressWarnings("unchecked")
- public T fromBytesWithHeader(String path, byte[] buffer) throws IOException {
- int hlen = header.length;
- int blen = buffer.length;
- if (hlen > 0) {
- if (blen < hlen) {
- throw new NoRecordException(path, E_NO_SERVICE_RECORD);
- }
- byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
- if (!Arrays.equals(header, magic)) {
- LOG.debug("start of entry does not match service record header at {}",
- path);
- throw new NoRecordException(path, E_NO_SERVICE_RECORD);
- }
- }
- return fromBytes(path, buffer, hlen);
- }
-
- /**
- * Check if a buffer has a header which matches this record type
- * @param buffer buffer
- * @return true if there is a match
- * @throws IOException
- */
- public boolean headerMatches(byte[] buffer) throws IOException {
- int hlen = header.length;
- int blen = buffer.length;
- boolean matches = false;
- if (blen > hlen) {
- byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
- matches = Arrays.equals(header, magic);
- }
- return matches;
- }
-
- /**
- * Convert an object to a JSON string
+ * Convert an instance to a JSON string
* @param instance instance to convert
* @return a JSON string description
* @throws JsonParseException parse problems
@@ -324,4 +291,19 @@ public class JsonSerDeser<T> {
return mapper.writeValueAsString(instance);
}
+ /**
+ * Convert an instance to a string form for output. This is a robust
+ * operation which will convert any JSON-generating exceptions into
+ * error text.
+ * @param instance non-null instance
+ * @return a JSON string
+ */
+ public String toString(T instance) {
+ Preconditions.checkArgument(instance != null, "Null instance argument");
+ try {
+ return toJson(instance);
+ } catch (IOException e) {
+ return "Failed to convert to a string: " + e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
index b4254a3..ec59d59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
@@ -22,17 +22,19 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
-import org.apache.hadoop.registry.client.types.AddressTypes;
+import static org.apache.hadoop.registry.client.types.AddressTypes.*;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Static methods to work with registry types —primarily endpoints and the
@@ -94,79 +96,66 @@ public class RegistryTypeUtils {
Preconditions.checkArgument(protocolType != null, "null protocolType");
Preconditions.checkArgument(hostname != null, "null hostname");
return new Endpoint(api,
- AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
+ ADDRESS_HOSTNAME_AND_PORT,
protocolType,
- tuplelist(hostname, Integer.toString(port)));
+ hostnamePortPair(hostname, port));
}
/**
* Create an IPC endpoint
* @param api API
- * @param protobuf flag to indicate whether or not the IPC uses protocol
- * buffers
* @param address the address as a tuple of (hostname, port)
* @return the new endpoint
*/
- public static Endpoint ipcEndpoint(String api,
- boolean protobuf, List<String> address) {
- ArrayList<List<String>> addressList = new ArrayList<List<String>>();
- if (address != null) {
- addressList.add(address);
- }
+ public static Endpoint ipcEndpoint(String api, InetSocketAddress address) {
return new Endpoint(api,
- AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
- protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF
- : ProtocolTypes.PROTOCOL_HADOOP_IPC,
- addressList);
+ ADDRESS_HOSTNAME_AND_PORT,
+ ProtocolTypes.PROTOCOL_HADOOP_IPC,
+ address== null ? null: hostnamePortPair(address));
}
/**
- * Create a single-element list of tuples from the input.
- * that is, an input ("a","b","c") is converted into a list
- * in the form [["a","b","c"]]
- * @param t1 tuple elements
- * @return a list containing a single tuple
+ * Create a single entry map
+ * @param key map entry key
+ * @param val map entry value
+ * @return a 1 entry map.
*/
- public static List<List<String>> tuplelist(String... t1) {
- List<List<String>> outer = new ArrayList<List<String>>();
- outer.add(tuple(t1));
- return outer;
+ public static Map<String, String> map(String key, String val) {
+ Map<String, String> map = new HashMap<String, String>(1);
+ map.put(key, val);
+ return map;
}
/**
- * Create a tuples from the input.
- * that is, an input ("a","b","c") is converted into a list
- * in the form ["a","b","c"]
- * @param t1 tuple elements
- * @return a single tuple as a list
+ * Create a URI
+ * @param uri value
+ * @return a 1 entry map.
*/
- public static List<String> tuple(String... t1) {
- return Arrays.asList(t1);
+ public static Map<String, String> uri(String uri) {
+ return map(ADDRESS_URI, uri);
}
/**
- * Create a tuples from the input, converting all to Strings in the process
- * that is, an input ("a", 7, true) is converted into a list
- * in the form ["a","7,"true"]
- * @param t1 tuple elements
- * @return a single tuple as a list
+ * Create a (hostname, port) address pair
+ * @param hostname hostname
+ * @param port port
+ * @return a 1 entry map.
*/
- public static List<String> tuple(Object... t1) {
- List<String> l = new ArrayList<String>(t1.length);
- for (Object t : t1) {
- l.add(t.toString());
- }
- return l;
+ public static Map<String, String> hostnamePortPair(String hostname, int port) {
+ Map<String, String> map =
+ map(ADDRESS_HOSTNAME_FIELD, hostname);
+ map.put(ADDRESS_PORT_FIELD, Integer.toString(port));
+ return map;
}
/**
- * Convert a socket address pair into a string tuple, (host, port).
- * TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups.
- * @param address an address
- * @return an element for the address list
+ * Create a (hostname, port) address pair
+ * @param address socket address whose hostname and port are used for the
+ * generated address.
+ * @return a 1 entry map.
*/
- public static List<String> marshall(InetSocketAddress address) {
- return tuple(address.getHostName(), address.getPort());
+ public static Map<String, String> hostnamePortPair(InetSocketAddress address) {
+ return hostnamePortPair(address.getHostName(), address.getPort());
}
/**
@@ -199,25 +188,37 @@ public class RegistryTypeUtils {
if (epr == null) {
return null;
}
- requireAddressType(AddressTypes.ADDRESS_URI, epr);
- List<List<String>> addresses = epr.addresses;
+ requireAddressType(ADDRESS_URI, epr);
+ List<Map<String, String>> addresses = epr.addresses;
if (addresses.size() < 1) {
throw new InvalidRecordException(epr.toString(),
"No addresses in endpoint");
}
List<String> results = new ArrayList<String>(addresses.size());
- for (List<String> address : addresses) {
- if (address.size() != 1) {
- throw new InvalidRecordException(epr.toString(),
- "Address payload invalid: wrong element count: " +
- address.size());
- }
- results.add(address.get(0));
+ for (Map<String, String> address : addresses) {
+ results.add(getAddressField(address, ADDRESS_URI));
}
return results;
}
/**
+ * Get a specific field from an address -raising an exception if
+ * the field is not present
+ * @param address address to query
+ * @param field field to resolve
+ * @return the resolved value. Guaranteed to be non-null.
+ * @throws InvalidRecordException if the field did not resolve
+ */
+ public static String getAddressField(Map<String, String> address,
+ String field) throws InvalidRecordException {
+ String val = address.get(field);
+ if (val == null) {
+ throw new InvalidRecordException("", "Missing address field: " + field);
+ }
+ return val;
+ }
+
+ /**
* Get the address URLs. Guranteed to return at least one address.
* @param epr endpoint
* @return the address as a URL
@@ -237,4 +238,53 @@ public class RegistryTypeUtils {
}
return results;
}
+
+ /**
+ * Validate the record by checking for null fields and other invalid
+ * conditions
+ * @param path path for exceptions
+ * @param record record to validate. May be null
+ * @throws InvalidRecordException on invalid entries
+ */
+ public static void validateServiceRecord(String path, ServiceRecord record)
+ throws InvalidRecordException {
+ if (record == null) {
+ throw new InvalidRecordException(path, "Null record");
+ }
+ if (!ServiceRecord.RECORD_TYPE.equals(record.type)) {
+ throw new InvalidRecordException(path,
+ "invalid record type field: \"" + record.type + "\"");
+ }
+
+ if (record.external != null) {
+ for (Endpoint endpoint : record.external) {
+ validateEndpoint(path, endpoint);
+ }
+ }
+ if (record.internal != null) {
+ for (Endpoint endpoint : record.internal) {
+ validateEndpoint(path, endpoint);
+ }
+ }
+ }
+
+ /**
+ * Validate the endpoint by checking for null fields and other invalid
+ * conditions
+ * @param path path for exceptions
+ * @param endpoint endpoint to validate. May be null
+ * @throws InvalidRecordException on invalid entries
+ */
+ public static void validateEndpoint(String path, Endpoint endpoint)
+ throws InvalidRecordException {
+ if (endpoint == null) {
+ throw new InvalidRecordException(path, "Null endpoint");
+ }
+ try {
+ endpoint.validate();
+ } catch (RuntimeException e) {
+ throw new InvalidRecordException(path, e.toString());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
index 8caf400..68dc84e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -314,7 +313,7 @@ public class RegistryUtils {
Collection<RegistryPathStatus> stats) throws IOException {
Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
for (RegistryPathStatus stat : stats) {
- if (stat.size > ServiceRecordHeader.getLength()) {
+ if (stat.size > ServiceRecord.RECORD_TYPE.length()) {
// maybe has data
String path = join(parentpath, stat.path);
try {
@@ -344,7 +343,6 @@ public class RegistryUtils {
* <p>
* @param operations operation support for fetches
* @param parentpath path of the parent of all the entries
- * @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored.
*/
@@ -362,7 +360,6 @@ public class RegistryUtils {
* <p>
* @param operations operation support for fetches
* @param parentpath path of the parent of all the entries
- * @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored.
*/
@@ -382,7 +379,7 @@ public class RegistryUtils {
*/
public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
public ServiceRecordMarshal() {
- super(ServiceRecord.class, ServiceRecordHeader.getData());
+ super(ServiceRecord.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
index 160433f..b81b9d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
@@ -21,17 +21,11 @@ package org.apache.hadoop.registry.client.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
/**
* Raised if there is no {@link ServiceRecord} resolved at the end
- * of the specified path, for reasons such as:
- * <ul>
- * <li>There wasn't enough data to contain a Service Record.</li>
- * <li>The start of the data did not match the {@link ServiceRecordHeader}
- * header.</li>
- * </ul>
- *
+ * of the specified path.
+ * <p>
* There may be valid data of some form at the end of the path, but it does
* not appear to be a Service Record.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
index 7c01bdf..271ab25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.zookeeper.CreateMode;
@@ -103,10 +105,12 @@ public class RegistryOperationsService extends CuratorService
int flags) throws IOException {
Preconditions.checkArgument(record != null, "null record");
validatePath(path);
+ // validate the record before putting it
+ RegistryTypeUtils.validateServiceRecord(path, record);
LOG.info("Bound at {} : {}", path, record);
CreateMode mode = CreateMode.PERSISTENT;
- byte[] bytes = serviceRecordMarshal.toByteswithHeader(record);
+ byte[] bytes = serviceRecordMarshal.toBytes(record);
zkSet(path, mode, bytes, getClientAcls(),
((flags & BindFlags.OVERWRITE) != 0));
}
@@ -114,7 +118,11 @@ public class RegistryOperationsService extends CuratorService
@Override
public ServiceRecord resolve(String path) throws IOException {
byte[] bytes = zkRead(path);
- return serviceRecordMarshal.fromBytesWithHeader(path, bytes);
+
+ ServiceRecord record = serviceRecordMarshal.fromBytes(path,
+ bytes, ServiceRecord.RECORD_TYPE);
+ RegistryTypeUtils.validateServiceRecord(path, record);
+ return record;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
index 192819c..36dbf0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
@@ -38,6 +38,8 @@ public interface AddressTypes {
* </pre>
*/
public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
+ public static final String ADDRESS_HOSTNAME_FIELD = "host";
+ public static final String ADDRESS_PORT_FIELD = "port";
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
index 51418d9..e4effb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
@@ -21,14 +21,16 @@ package org.apache.hadoop.registry.client.types;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.binding.JsonSerDeser;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Description of a single service/component endpoint.
@@ -67,7 +69,7 @@ public final class Endpoint implements Cloneable {
/**
* a list of address tuples —tuples whose format depends on the address type
*/
- public List<List<String>> addresses;
+ public List<Map<String, String>> addresses;
/**
* Create an empty instance.
@@ -84,10 +86,11 @@ public final class Endpoint implements Cloneable {
this.api = that.api;
this.addressType = that.addressType;
this.protocolType = that.protocolType;
- this.addresses = new ArrayList<List<String>>(that.addresses.size());
- for (List<String> address : addresses) {
- List<String> addr2 = new ArrayList<String>(address.size());
- Collections.copy(address, addr2);
+ this.addresses = newAddresses(that.addresses.size());
+ for (Map<String, String> address : that.addresses) {
+ Map<String, String> addr2 = new HashMap<String, String>(address.size());
+ addr2.putAll(address);
+ addresses.add(addr2);
}
}
@@ -101,17 +104,83 @@ public final class Endpoint implements Cloneable {
public Endpoint(String api,
String addressType,
String protocolType,
- List<List<String>> addrs) {
+ List<Map<String, String>> addrs) {
this.api = api;
this.addressType = addressType;
this.protocolType = protocolType;
- this.addresses = new ArrayList<List<String>>();
+ this.addresses = newAddresses(0);
if (addrs != null) {
addresses.addAll(addrs);
}
}
/**
+ * Build an endpoint with an empty address list
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType) {
+ this.api = api;
+ this.addressType = addressType;
+ this.protocolType = protocolType;
+ this.addresses = newAddresses(0);
+ }
+
+ /**
+ * Build an endpoint with a single address entry.
+ * <p>
+ * This constructor is superfluous given the varags constructor is equivalent
+ * for a single element argument. However, type-erasure in java generics
+ * causes javac to warn about unchecked generic array creation. This
+ * constructor, which represents the common "one address" case, does
+ * not generate compile-time warnings.
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addr address. May be null —in which case it is not added
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ Map<String, String> addr) {
+ this(api, addressType, protocolType);
+ if (addr != null) {
+ addresses.add(addr);
+ }
+ }
+
+ /**
+ * Build an endpoint with a list of addresses
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addrs addresses. Null elements will be skipped
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ Map<String, String>...addrs) {
+ this(api, addressType, protocolType);
+ for (Map<String, String> addr : addrs) {
+ if (addr!=null) {
+ addresses.add(addr);
+ }
+ }
+ }
+
+ /**
+ * Create a new address structure of the requested size
+ * @param size size to create
+ * @return the new list
+ */
+ private List<Map<String, String>> newAddresses(int size) {
+ return new ArrayList<Map<String, String>>(size);
+ }
+
+ /**
* Build an endpoint from a list of URIs; each URI
* is ASCII-encoded and added to the list of addresses.
* @param api API name
@@ -125,40 +194,16 @@ public final class Endpoint implements Cloneable {
this.addressType = AddressTypes.ADDRESS_URI;
this.protocolType = protocolType;
- List<List<String>> addrs = new ArrayList<List<String>>(uris.length);
+ List<Map<String, String>> addrs = newAddresses(uris.length);
for (URI uri : uris) {
- addrs.add(RegistryTypeUtils.tuple(uri.toString()));
+ addrs.add(RegistryTypeUtils.uri(uri.toString()));
}
this.addresses = addrs;
}
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("Endpoint{");
- sb.append("api='").append(api).append('\'');
- sb.append(", addressType='").append(addressType).append('\'');
- sb.append(", protocolType='").append(protocolType).append('\'');
-
- sb.append(", addresses=");
- if (addresses != null) {
- sb.append("[ ");
- for (List<String> address : addresses) {
- sb.append("[ ");
- if (address == null) {
- sb.append("NULL entry in address list");
- } else {
- for (String elt : address) {
- sb.append('"').append(elt).append("\" ");
- }
- }
- sb.append("] ");
- };
- sb.append("] ");
- } else {
- sb.append("(null) ");
- }
- sb.append('}');
- return sb.toString();
+ return marshalToString.toString(this);
}
/**
@@ -173,7 +218,7 @@ public final class Endpoint implements Cloneable {
Preconditions.checkNotNull(addressType, "null addressType field");
Preconditions.checkNotNull(protocolType, "null protocolType field");
Preconditions.checkNotNull(addresses, "null addresses field");
- for (List<String> address : addresses) {
+ for (Map<String, String> address : addresses) {
Preconditions.checkNotNull(address, "null element in address");
}
}
@@ -184,7 +229,19 @@ public final class Endpoint implements Cloneable {
* @throws CloneNotSupportedException
*/
@Override
- protected Object clone() throws CloneNotSupportedException {
+ public Object clone() throws CloneNotSupportedException {
return super.clone();
}
+
+
+ /**
+ * Static instance of service record marshalling
+ */
+ private static class Marshal extends JsonSerDeser<Endpoint> {
+ private Marshal() {
+ super(Endpoint.class);
+ }
+ }
+
+ private static final Marshal marshalToString = new Marshal();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
index f225cf0..b836b00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
@@ -34,16 +34,11 @@ public interface ProtocolTypes {
String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
/**
- * Classic Hadoop IPC : {@value}.
+ * Hadoop IPC, "classic" or protobuf : {@value}.
*/
String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
/**
- * Hadoop protocol buffers IPC: {@value}.
- */
- String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
-
- /**
* Corba IIOP: {@value}.
*/
String PROTOCOL_IIOP = "IIOP";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
index 378127f..9403d31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.registry.client.types;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.codehaus.jackson.annotate.JsonAnyGetter;
import org.codehaus.jackson.annotate.JsonAnySetter;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -41,6 +42,17 @@ import java.util.Map;
public class ServiceRecord implements Cloneable {
/**
+ * A type string which MUST be in the serialized json. This permits
+ * fast discarding of invalid entries
+ */
+ public static final String RECORD_TYPE = "JSONServiceRecord";
+
+ /**
+ * The type field. This must be the string {@link #RECORD_TYPE}
+ */
+ public String type = RECORD_TYPE;
+
+ /**
* Description string
*/
public String description;
@@ -233,17 +245,5 @@ public class ServiceRecord implements Cloneable {
return super.clone();
}
- /**
- * Validate the record by checking for null fields and other invalid
- * conditions
- * @throws NullPointerException if a field is null when it
- * MUST be set.
- * @throws RuntimeException on invalid entries
- */
- public void validate() {
- for (Endpoint endpoint : external) {
- Preconditions.checkNotNull("null endpoint", endpoint);
- endpoint.validate();
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
deleted file mode 100644
index 2f75dba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.registry.client.types;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Service record header; access to the byte array kept private
- * to avoid findbugs warnings of mutability
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class ServiceRecordHeader {
- /**
- * Header of a service record: "jsonservicerec"
- * By making this over 12 bytes long, we can auto-determine which entries
- * in a listing are too short to contain a record without getting their data
- */
- private static final byte[] RECORD_HEADER = {
- 'j', 's', 'o', 'n',
- 's', 'e', 'r', 'v', 'i', 'c', 'e',
- 'r', 'e', 'c'
- };
-
- /**
- * Get the length of the record header
- * @return the header length
- */
- public static int getLength() {
- return RECORD_HEADER.length;
- }
-
- /**
- * Get a clone of the record header
- * @return the new record header.
- */
- public static byte[] getData() {
- byte[] h = new byte[RECORD_HEADER.length];
- System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
- return h;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
index 1c19ade..a950475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
@@ -4,6 +4,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
(*
+============================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +20,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+============================================================================
*)
(*
@@ -71,13 +73,22 @@ CONSTANTS
MknodeActions \* all possible mkdir actions
+ASSUME PathChars \in STRING
+ASSUME Paths \in STRING
+
+(* Data in records is JSON, hence a string *)
+ASSUME Data \in STRING
+
+----------------------------------------------------------------------------------------
(* the registry*)
VARIABLE registry
+
(* Sequence of actions to apply to the registry *)
VARIABLE actions
+
----------------------------------------------------------------------------------------
(* Tuple of all variables. *)
@@ -92,7 +103,6 @@ vars == << registry, actions >>
(* Persistence policy *)
PersistPolicySet == {
- "", \* Undefined; field not present. PERMANENT is implied.
"permanent", \* persists until explicitly removed
"application", \* persists until the application finishes
"application-attempt", \* persists until the application attempt finishes
@@ -104,7 +114,6 @@ TypeInvariant ==
/\ \A p \in PersistPolicies: p \in PersistPolicySet
-
----------------------------------------------------------------------------------------
@@ -129,6 +138,14 @@ RegistryEntry == [
]
+(* Define the set of all string to string mappings *)
+
+StringMap == [
+ STRING |-> STRING
+]
+
+
+
(*
An endpoint in a service record
*)
@@ -140,21 +157,14 @@ Endpoint == [
addresses: Addresses
]
-(* Attributes are the set of all string to string mappings *)
-
-Attributes == [
-STRING |-> STRING
-]
-
(*
A service record
*)
ServiceRecord == [
- \* ID -used when applying the persistence policy
- yarn_id: STRING,
- \* the persistence policy
- yarn_persistence: PersistPolicySet,
+ \* This MUST be present: if it is not then the data is not a service record
+ \* This permits shortcut scan & reject of byte arrays without parsing
+ type: "JSONServiceRecord",
\*A description
description: STRING,
@@ -166,9 +176,34 @@ ServiceRecord == [
internal: Endpoints,
\* Attributes are a function
- attributes: Attributes
+ attributes: StringMap
]
+----------------------------------------------------------------------------------------
+
+(*
+ There is an operation serialize whose internals are not defined,
+ Which converts the service records to JSON
+ *)
+
+CONSTANT serialize(_)
+
+(* A function which returns true iff the byte stream is considered a valid service record. *)
+CONSTANT containsServiceRecord(_)
+
+(* A function to deserialize a string to JSON *)
+CONSTANT deserialize(_)
+
+ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN
+
+(* Records can be serialized *)
+ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r))
+
+(* All strings for which containsServiceRecord() holds can be deserialized *)
+ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord
+
+
+
----------------------------------------------------------------------------------------
@@ -304,8 +339,8 @@ validRegistry(R) ==
\* an entry must be the root entry or have a parent entry
/\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
- \* If the entry has data, it must be a service record
- /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords)
+ \* If the entry has data, it must contain a service record
+ /\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data))
----------------------------------------------------------------------------------------
@@ -336,13 +371,13 @@ mknode() adds a new empty entry where there was none before, iff
*)
mknodeSimple(R, path) ==
- LET record == [ path |-> path, data |-> <<>> ]
+ LET entry == [ path |-> path, data |-> <<>> ]
IN \/ exists(R, path)
- \/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} ))
+ \/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} ))
(*
-For all parents, the mknodeSimpl() criteria must apply.
+For all parents, the mknodeSimple() criteria must apply.
This could be defined recursively, though as TLA+ does not support recursion,
an alternative is required
@@ -350,7 +385,8 @@ an alternative is required
Because this specification is declaring the final state of a operation, not
the implemental, all that is needed is to describe those parents.
-It declares that the mkdirSimple state applies to the path and all its parents in the set R'
+It declares that the mknodeSimple() state applies to the path and all
+its parents in the set R'
*)
mknodeWithParents(R, path) ==
@@ -402,7 +438,7 @@ purge(R, path, id, persistence) ==
=> recursiveDelete(R, p2.path)
(*
-resolveRecord() resolves the record at a path or fails.
+resolveEntry() resolves the record entry at a path or fails.
It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
is guaranteed to return the single entry of that set, iff the choice predicate holds.
@@ -411,19 +447,28 @@ Using a predicate of TRUE, it always succeeds, so this function selects
the sole entry of the resolve operation.
*)
-resolveRecord(R, path) ==
+resolveEntry(R, path) ==
LET l == resolve(R, path) IN
/\ Cardinality(l) = 1
/\ CHOOSE e \in l : TRUE
(*
+ Resolve a record by resolving the entry and deserializing the result
+ *)
+resolveRecord(R, path) ==
+ deserialize(resolveEntry(R, path))
+
+
+(*
The specific action of putting an entry into a record includes validating the record
*)
validRecordToBind(path, record) ==
\* The root entry must have permanent persistence
- isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent"
- \/ record.attributes["yarn:persistence"] = "")
+ isRootPath(path) => (
+ record.attributes["yarn:persistence"] = "permanent"
+ \/ record.attributes["yarn:persistence"]
+ \/ record.attributes["yarn:persistence"] = {})
(*
@@ -432,13 +477,12 @@ marshalled as the data in the entry
*)
bindRecord(R, path, record) ==
/\ validRecordToBind(path, record)
- /\ bind(R, [path |-> path, data |-> record])
+ /\ bind(R, [path |-> path, data |-> serialize(record)])
----------------------------------------------------------------------------------------
-
(*
The action queue can only contain one of the sets of action types, and
by giving each a unique name, those sets are guaranteed to be disjoint
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
index 460ecad..91602e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.registry;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -46,11 +45,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.*;
/**
* This is a set of static methods to aid testing the registry operations.
@@ -61,18 +56,18 @@ public class RegistryTestHelper extends Assert {
public static final String SC_HADOOP = "org-apache-hadoop";
public static final String USER = "devteam/";
public static final String NAME = "hdfs";
- public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs";
- public static final String API_HDFS = "org_apache_hadoop_namenode_dfs";
+ public static final String API_WEBHDFS = "classpath:org.apache.hadoop.namenode.webhdfs";
+ public static final String API_HDFS = "classpath:org.apache.hadoop.namenode.dfs";
public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
public static final String ENTRY_PATH = PARENT_PATH + NAME;
- public static final String NNIPC = "nnipc";
- public static final String IPC2 = "IPC2";
+ public static final String NNIPC = "uuid:423C2B93-C927-4050-AEC6-6540E6646437";
+ public static final String IPC2 = "uuid:0663501D-5AD3-4F7E-9419-52F5D6636FCF";
private static final Logger LOG =
LoggerFactory.getLogger(RegistryTestHelper.class);
- public static final String KTUTIL = "ktutil";
private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
new RegistryUtils.ServiceRecordMarshal();
+ public static final String HTTP_API = "http://";
/**
* Assert the path is valid by ZK rules
@@ -148,9 +143,9 @@ public class RegistryTestHelper extends Assert {
assertEquals(API_WEBHDFS, webhdfs.api);
assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
- List<List<String>> addressList = webhdfs.addresses;
- List<String> url = addressList.get(0);
- String addr = url.get(0);
+ List<Map<String, String>> addressList = webhdfs.addresses;
+ Map<String, String> url = addressList.get(0);
+ String addr = url.get("uri");
assertTrue(addr.contains("http"));
assertTrue(addr.contains(":8020"));
@@ -159,8 +154,9 @@ public class RegistryTestHelper extends Assert {
nnipc.protocolType);
Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
+ assertNotNull(ipc2);
- Endpoint web = findEndpoint(record, "web", true, 1, 1);
+ Endpoint web = findEndpoint(record, HTTP_API, true, 1, 1);
assertEquals(1, web.addresses.size());
assertEquals(1, web.addresses.get(0).size());
}
@@ -275,14 +271,14 @@ public class RegistryTestHelper extends Assert {
public static void addSampleEndpoints(ServiceRecord entry, String hostname)
throws URISyntaxException {
assertNotNull(hostname);
- entry.addExternalEndpoint(webEndpoint("web",
+ entry.addExternalEndpoint(webEndpoint(HTTP_API,
new URI("http", hostname + ":80", "/")));
entry.addExternalEndpoint(
restEndpoint(API_WEBHDFS,
new URI("http", hostname + ":8020", "/")));
- Endpoint endpoint = ipcEndpoint(API_HDFS, true, null);
- endpoint.addresses.add(tuple(hostname, "8030"));
+ Endpoint endpoint = ipcEndpoint(API_HDFS, null);
+ endpoint.addresses.add(RegistryTypeUtils.hostnamePortPair(hostname, 8030));
entry.addInternalEndpoint(endpoint);
InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
entry.addInternalEndpoint(
@@ -290,9 +286,7 @@ public class RegistryTestHelper extends Assert {
8050));
entry.addInternalEndpoint(
RegistryTypeUtils.ipcEndpoint(
- IPC2,
- true,
- RegistryTypeUtils.marshall(localhost)));
+ IPC2, localhost));
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
index 14e3b1f..f1814d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
@@ -19,9 +19,9 @@
package org.apache.hadoop.registry.client.binding;
import org.apache.hadoop.registry.RegistryTestHelper;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -31,8 +31,6 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.EOFException;
-
/**
* Test record marshalling
*/
@@ -44,6 +42,7 @@ public class TestMarshalling extends RegistryTestHelper {
public final Timeout testTimeout = new Timeout(10000);
@Rule
public TestName methodName = new TestName();
+
private static RegistryUtils.ServiceRecordMarshal marshal;
@BeforeClass
@@ -55,42 +54,65 @@ public class TestMarshalling extends RegistryTestHelper {
public void testRoundTrip() throws Throwable {
String persistence = PersistencePolicies.PERMANENT;
ServiceRecord record = createRecord(persistence);
- record.set("customkey","customvalue");
- record.set("customkey2","customvalue2");
+ record.set("customkey", "customvalue");
+ record.set("customkey2", "customvalue2");
+ RegistryTypeUtils.validateServiceRecord("", record);
LOG.info(marshal.toJson(record));
byte[] bytes = marshal.toBytes(record);
- ServiceRecord r2 = marshal.fromBytes("", bytes, 0);
+ ServiceRecord r2 = marshal.fromBytes("", bytes);
assertMatches(record, r2);
+ RegistryTypeUtils.validateServiceRecord("", r2);
}
- @Test
- public void testRoundTripHeaders() throws Throwable {
- ServiceRecord record = createRecord(PersistencePolicies.CONTAINER);
- byte[] bytes = marshal.toByteswithHeader(record);
- ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
- assertMatches(record, r2);
+ @Test(expected = NoRecordException.class)
+ public void testUnmarshallNoData() throws Throwable {
+ marshal.fromBytes("src", new byte[]{});
}
@Test(expected = NoRecordException.class)
- public void testRoundTripBadHeaders() throws Throwable {
- ServiceRecord record = createRecord(PersistencePolicies.APPLICATION);
- byte[] bytes = marshal.toByteswithHeader(record);
- bytes[1] = 0x01;
- marshal.fromBytesWithHeader("src", bytes);
+ public void testUnmarshallNotEnoughData() throws Throwable {
+ // this is nominally JSON -but without the service record header
+ marshal.fromBytes("src", new byte[]{'{','}'}, ServiceRecord.RECORD_TYPE);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnmarshallNoBody() throws Throwable {
+ byte[] bytes = "this is not valid JSON at all and should fail".getBytes();
+ marshal.fromBytes("src", bytes);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnmarshallWrongType() throws Throwable {
+ byte[] bytes = "{'type':''}".getBytes();
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling", bytes);
+ RegistryTypeUtils.validateServiceRecord("validating", serviceRecord);
}
@Test(expected = NoRecordException.class)
- public void testUnmarshallHeaderTooShort() throws Throwable {
- marshal.fromBytesWithHeader("src", new byte[]{'a'});
+ public void testUnmarshallWrongLongType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "ThisRecordHasALongButNonMatchingType";
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
+ bytes, ServiceRecord.RECORD_TYPE);
}
- @Test(expected = EOFException.class)
- public void testUnmarshallNoBody() throws Throwable {
- byte[] bytes = ServiceRecordHeader.getData();
- marshal.fromBytesWithHeader("src", bytes);
+ @Test(expected = NoRecordException.class)
+ public void testUnmarshallNoType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "NoRecord";
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
+ bytes, ServiceRecord.RECORD_TYPE);
}
+ @Test(expected = InvalidRecordException.class)
+ public void testRecordValidationWrongType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "NotAServiceRecordType";
+ RegistryTypeUtils.validateServiceRecord("validating", record);
+ }
@Test
public void testUnknownFieldsRoundTrip() throws Throwable {
@@ -102,8 +124,8 @@ public class TestMarshalling extends RegistryTestHelper {
assertEquals("2", record.get("intval"));
assertNull(record.get("null"));
assertEquals("defval", record.get("null", "defval"));
- byte[] bytes = marshal.toByteswithHeader(record);
- ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord r2 = marshal.fromBytes("", bytes);
assertEquals("value", r2.get("key"));
assertEquals("2", r2.get("intval"));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
index 7a7f88c..853d7f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.AbstractRegistryTest;
import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
@@ -91,10 +92,8 @@ public class TestRegistryOperations extends AbstractRegistryTest {
childStats.values());
assertEquals(1, records.size());
ServiceRecord record = records.get(ENTRY_PATH);
- assertNotNull(record);
- record.validate();
+ RegistryTypeUtils.validateServiceRecord(ENTRY_PATH, record);
assertMatches(written, record);
-
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16705780/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
index a2a5009..b38d9fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
@@ -353,6 +353,10 @@ application.
<td>Description</td>
</tr>
<tr>
+ <td>type: String</td>
+ <td>Always: "JSONServiceRecord"</td>
+ </tr>
+ <tr>
<td>description: String</td>
<td>Human-readable description.</td>
</tr>
@@ -366,6 +370,8 @@ application.
</tr>
</table>
+The type field MUST be `"JSONServiceRecord"`. Mandating this string allows future record types *and* permits rapid rejection of byte arrays that lack this string before attempting JSON parsing.
+
### YARN Persistence policies
The YARN Resource Manager integration integrates cleanup of service records
@@ -379,7 +385,6 @@ any use of the registry without the RM's participation.
The attributes, `yarn:id` and `yarn:persistence` specify which records
*and any child entries* may be deleted as the associated YARN components complete.
-
The `yarn:id` field defines the application, attempt or container ID to match;
the `yarn:persistence` attribute defines the trigger for record cleanup, and
implicitly the type of the contents of the `yarn:id` field.
@@ -432,31 +437,32 @@ up according the lifecycle of that application.
<td>Description</td>
</tr>
<tr>
- <td>addresses: List[List[String]]</td>
- <td>a list of address tuples whose format depends on the address type</td>
- </tr>
- <tr>
- <td>addressType: String</td>
- <td>format of the binding</td>
- </tr>
+ <td>api: URI as String</td>
+ <td>API implemented at the end of the binding</td>
<tr>
<td>protocol: String</td>
<td>Protocol. Examples:
`http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ...</td>
</tr>
<tr>
- <td>api: String</td>
- <td>API implemented at the end of the binding</td>
+ <td>addressType: String</td>
+ <td>format of the binding</td>
</tr>
+ </tr>
+ <tr>
+ <td>addresses: List[Map[String, String]]</td>
+ <td>a list of address maps</td>
+ </tr>
+
</table>
All string fields have a limit on size, to dissuade services from hiding
complex JSON structures in the text description.
-### Field: Address Type
+#### Field `addressType`: Address Type
-The addressType field defines the string format of entries.
+The `addressType` field defines the string format of entries.
Having separate types is that tools (such as a web viewer) can process binding
strings without having to recognize the protocol.
@@ -467,43 +473,58 @@ strings without having to recognize the protocol.
<td>binding format</td>
</tr>
<tr>
- <td>`url`</td>
- <td>`[URL]`</td>
+ <td>uri</td>
+ <td>uri:URI of endpoint</td>
</tr>
<tr>
- <td>`hostname`</td>
- <td>`[hostname]`</td>
+ <td>hostname</td>
+ <td>hostname: service host</td>
</tr>
<tr>
- <td>`inetaddress`</td>
- <td>`[hostname, port]`</td>
+ <td>inetaddress</td>
+ <td>hostname: service host, port: service port</td>
</tr>
<tr>
- <td>`path`</td>
- <td>`[/path/to/something]`</td>
+ <td>path</td>
+ <td>path: generic unix filesystem path</td>
</tr>
<tr>
- <td>`zookeeper`</td>
- <td>`[quorum-entry, path]`</td>
+ <td>zookeeper</td>
+ <td>hostname: service host, port: service port, path: ZK path</td>
</tr>
</table>
-An actual zookeeper binding consists of a list of `hostname:port` bindings –the
-quorum— and the path within. In the proposed schema, every quorum entry will be
-listed as a triple of `[hostname, port, path]`. Client applications do not
-expect the path to de be different across the quorum. The first entry in the
-list of quorum hosts MUST define the path to be used by all clients. Later
-entries SHOULD list the same path, though clients MUST ignore these.
+In the zookeeper binding, every entry represents a single node in quorum,
+the `hostname` and `port` fields defining the hostname of the ZK instance
+and the port on which it is listening. The `path` field lists zookeeper path
+for applications to use. For example, for HBase this would refer to the znode
+containing information about the HBase cluster.
+
+The path MUST be identical across all address elements in the `addresses` list.
+This ensures that any single address contains enough information to connect
+to the quorum and connect to the relevant znode.
New Address types may be defined; if not standard please prefix with the
character sequence `"x-"`.
-#### **Field: API**
+### Field `api`: API identifier
+
+The API field MUST contain a URI that identifies the specific API of an endpoint.
+These MUST be unique to an API to avoid confusion.
+
+The following strategies are suggested to provide unique URIs for an API
+
+1. The SOAP/WS-* convention of using the URL to where the WSDL defining the service
+2. A URL to the svn/git hosted document defining a REST API
+3. the `classpath` schema followed by a path to a class or package in an application.
+4. The `uuid` schema with a generated UUID.
+
+It is hoped that standard API URIs will be defined for common APIs. Two such non-normative APIs are used in this document
+
+* `http://` : A web site for humans
+* `classpath:javax.management.jmx`: and endpoint supporting the JMX management protocol (RMI-based)
-APIs may be unique to a service class, or may be common across by service
-classes. They MUST be given unique names. These MAY be based on service
-packages but MAY be derived from other naming schemes:
### Examples of Service Entries
@@ -524,12 +545,14 @@ overall application. It exports the URL to a load balancer.
{
"description" : "tomcat-based web application",
- "registrationTime" : 1408638082444,
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
- "addresses" : [ [ "http://loadbalancer/" ] [ "http://loadbalancer2/" ] ]
+ "protocol" : "REST",
+ "addresses" : [
+ { "uri" : "http://loadbalancer/" },
+ { "uri" : "http://loadbalancer2/" }
+ ]
} ],
"internal" : [ ]
}
@@ -545,21 +568,23 @@ will trigger the deletion of this entry
/users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001
{
- "registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000001",
- "yarn:persistence" : "3",
- "description" : null,
+ "yarn:persistence" : "container",
+ "description" : "",
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
- "addresses" : [ [ "http://rack4server3:43572" ] ]
+ "protocol" : "REST",
+ "addresses" : [{ "uri" : "rack4server3:43572" } ]
} ],
"internal" : [ {
- "api" : "jmx",
+ "api" : "classpath:javax.management.jmx",
"addressType" : "host/port",
- "protocolType" : "JMX",
- "addresses" : [ [ "rack4server3", "43573" ] ]
+ "protocol" : "rmi",
+ "addresses" : [ {
+ "host" : "rack4server3",
+ "port" : "48551"
+ } ]
} ]
}
@@ -571,19 +596,22 @@ external endpoint, the JMX addresses as internal.
{
"registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000002",
- "yarn:persistence" : "3",
+ "yarn:persistence" : "container",
"description" : null,
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
+ "protocol" : "REST",
"addresses" : [ [ "http://rack1server28:35881" ] ]
} ],
"internal" : [ {
- "api" : "jmx",
+ "api" : "classpath:javax.management.jmx",
"addressType" : "host/port",
- "protocolType" : "JMX",
- "addresses" : [ [ "rack1server28", "35882" ] ]
+ "protocol" : "rmi",
+ "addresses" : [ {
+ "host" : "rack1server28",
+ "port" : "48551"
+ } ]
} ]
}
@@ -887,3 +915,106 @@ Implementations may throttle update operations.
**Rate of Polling**
Clients which poll the registry may be throttled.
+
+# Complete service record example
+
+Below is a (non-normative) example of a service record retrieved
+from a YARN application.
+
+
+ {
+ "type" : "JSONServiceRecord",
+ "description" : "Slider Application Master",
+ "yarn:persistence" : "application",
+ "yarn:id" : "application_1414052463672_0028",
+ "external" : [ {
+ "api" : "classpath:org.apache.slider.appmaster",
+ "addressType" : "host/port",
+ "protocol" : "hadoop/IPC",
+ "addresses" : [ {
+ "port" : "48551",
+ "host" : "nn.example.com"
+ } ]
+ }, {
+ "api" : "http://",
+ "addressType" : "uri",
+ "protocol" : "web",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.management",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/mgmt"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.registry",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/registry"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher.configurations",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/slider"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher.exports",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/exports"
+ } ]
+ } ],
+ "internal" : [ {
+ "api" : "classpath:org.apache.slider.agents.secure",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "https://nn.example.com:52705/ws/v1/slider/agents"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.agents.oneway",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "https://nn.example.com:33425/ws/v1/slider/agents"
+ } ]
+ } ]
+ }
+
+It publishes a number of endpoints, both internal and external.
+
+External:
+
+1. The IPC hostname and port for client-AM communications
+1. URL to the AM's web UI
+1. A series of REST URLs under the web UI for specific application services.
+The details are irrelevant —note that they use an application-specific API
+value to ensure uniqueness.
+
+Internal:
+1. Two URLS to REST APIs offered by the AM for containers deployed by
+ the application itself.
+
+Python agents running in the containers retrieve the internal endpoint
+URLs to communicate with their AM. The record is resolved on container startup
+and cached until communications problems occur. At that point the registry is
+queried for the current record, then an attempt is made to reconnect to the AM.
+
+Here "connectivity" problems means both "low level socket/IO errors" and
+"failures in HTTPS authentication". The agents use two-way HTTPS authentication
+—if the AM fails and another application starts listening on the same ports
+it will trigger an authentication failure and hence service record reread.
[2/3] hadoop git commit: YARN-2768 Improved Yarn Registry service
record structure (stevel)
Posted by st...@apache.org.
YARN-2768 Improved Yarn Registry service record structure (stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e333584c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e333584c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e333584c
Branch: refs/heads/branch-2.6
Commit: e333584ca6e37f0a51cb27dadbe9047f7775d8e7
Parents: b557f68
Author: Steve Loughran <st...@apache.org>
Authored: Thu Nov 6 20:21:25 2014 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Nov 6 20:21:49 2014 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../apache/hadoop/registry/cli/RegistryCli.java | 25 +-
.../registry/client/binding/JsonSerDeser.java | 142 +++++------
.../client/binding/RegistryTypeUtils.java | 166 ++++++++-----
.../registry/client/binding/RegistryUtils.java | 7 +-
.../client/exceptions/NoRecordException.java | 10 +-
.../impl/zk/RegistryOperationsService.java | 12 +-
.../registry/client/types/AddressTypes.java | 2 +
.../hadoop/registry/client/types/Endpoint.java | 131 ++++++++---
.../registry/client/types/ProtocolTypes.java | 7 +-
.../registry/client/types/ServiceRecord.java | 26 +--
.../client/types/ServiceRecordHeader.java | 59 -----
.../src/main/tla/yarnregistry.tla | 94 ++++++--
.../hadoop/registry/RegistryTestHelper.java | 36 ++-
.../client/binding/TestMarshalling.java | 72 ++++--
.../operations/TestRegistryOperations.java | 5 +-
.../src/site/markdown/registry/yarn-registry.md | 233 +++++++++++++++----
17 files changed, 623 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5eb58d3..33331b4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -665,6 +665,8 @@ Release 2.6.0 - UNRELEASED
YARN-2677 registry punycoding of usernames doesn't fix all usernames to be
DNS-valid (stevel)
+ YARN-2768 Improved Yarn Registry service record structure (stevel)
+
---
YARN-2598 GHS should show N/A instead of null for the inaccessible information
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
index 863039e..bf2b5e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java
@@ -24,6 +24,7 @@ import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
@@ -174,24 +175,22 @@ public class RegistryCli extends Configured implements Tool {
ServiceRecord record = registry.resolve(argsList.get(1));
for (Endpoint endpoint : record.external) {
- if ((endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_WEBUI))
- || (endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_REST))) {
- sysout.print(" Endpoint(ProtocolType="
- + endpoint.protocolType + ", Api="
- + endpoint.api + "); Uris are: ");
- } else {
- sysout.print(" Endpoint(ProtocolType="
+ sysout.println(" Endpoint(ProtocolType="
+ endpoint.protocolType + ", Api="
+ endpoint.api + ");"
+ " Addresses(AddressType="
+ endpoint.addressType + ") are: ");
- }
- for (List<String> a : endpoint.addresses) {
- sysout.print(a + " ");
- }
- sysout.println();
- }
+ for (Map<String, String> address : endpoint.addresses) {
+ sysout.println(" [ ");
+ for (Map.Entry<String, String> entry : address.entrySet()) {
+ sysout.println(" " + entry.getKey()
+ + ": \"" + entry.getValue() + "\"");
+ }
+ sysout.println(" ]");
+ }
+ sysout.println();
+ }
return 0;
} catch (Exception e) {
syserr.println(analyzeException("resolve", e, argsList));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
index e086e36..af4e4f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.registry.client.binding;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,8 +46,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
/**
* Support for marshalling objects to and from JSON.
@@ -62,30 +61,30 @@ public class JsonSerDeser<T> {
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class);
private static final String UTF_8 = "UTF-8";
- public static final String E_NO_SERVICE_RECORD = "No service record at path";
+ public static final String E_NO_DATA = "No data at path";
+ public static final String E_DATA_TOO_SHORT = "Data at path too short";
+ public static final String E_MISSING_MARKER_STRING =
+ "Missing marker string: ";
private final Class<T> classType;
private final ObjectMapper mapper;
- private final byte[] header;
/**
* Create an instance bound to a specific type
* @param classType class to marshall
- * @param header byte array to use as header
*/
- public JsonSerDeser(Class<T> classType, byte[] header) {
+ public JsonSerDeser(Class<T> classType) {
Preconditions.checkArgument(classType != null, "null classType");
- Preconditions.checkArgument(header != null, "null header");
this.classType = classType;
this.mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- // make an immutable copy to keep findbugs happy.
- byte[] h = new byte[header.length];
- System.arraycopy(header, 0, h, 0, header.length);
- this.header = h;
}
+ /**
+ * Get the simple name of the class type to be marshalled
+ * @return the name of the class being marshalled
+ */
public String getName() {
return classType.getSimpleName();
}
@@ -183,7 +182,7 @@ public class JsonSerDeser<T> {
if (count != len) {
throw new EOFException(path.toString() + ": read finished prematurely");
}
- return fromBytes(path.toString(), b, 0);
+ return fromBytes(path.toString(), b);
}
/**
@@ -206,8 +205,7 @@ public class JsonSerDeser<T> {
* @throws IOException on any failure
*/
private void writeJsonAsBytes(T instance,
- DataOutputStream dataOutputStream) throws
- IOException {
+ DataOutputStream dataOutputStream) throws IOException {
try {
byte[] b = toBytes(instance);
dataOutputStream.write(b);
@@ -228,36 +226,50 @@ public class JsonSerDeser<T> {
}
/**
- * Convert JSON To bytes, inserting the header
- * @param instance instance to convert
- * @return a byte array
- * @throws IOException
+ * Deserialize from a byte array
+ * @param path path the data came from
+ * @param bytes byte array
+ * @throws IOException all problems
+ * @throws EOFException not enough data
+ * @throws InvalidRecordException if the parsing failed -the record is invalid
*/
- public byte[] toByteswithHeader(T instance) throws IOException {
- byte[] body = toBytes(instance);
-
- ByteBuffer buffer = ByteBuffer.allocate(body.length + header.length);
- buffer.put(header);
- buffer.put(body);
- return buffer.array();
+ public T fromBytes(String path, byte[] bytes) throws IOException,
+ InvalidRecordException {
+ return fromBytes(path, bytes, "");
}
/**
- * Deserialize from a byte array
+ * Deserialize from a byte array, optionally checking for a marker string.
+ * <p>
+ * If the marker parameter is supplied (and not empty), then its presence
+ * will be verified before the JSON parsing takes place; it is a fast-fail
+ * check. If not found, an {@link InvalidRecordException} exception will be
+ * raised
* @param path path the data came from
* @param bytes byte array
- * @return offset in the array to read from
+ * @param marker an optional string which, if set, MUST be present in the
+ * UTF-8 parsed payload.
+ * @return The parsed record
* @throws IOException all problems
* @throws EOFException not enough data
- * @throws InvalidRecordException if the parsing failed -the record is invalid
+ * @throws InvalidRecordException if the JSON parsing failed.
+ * @throws NoRecordException if the data is not considered a record: either
+ * it is too short or it did not contain the marker string.
*/
- public T fromBytes(String path, byte[] bytes, int offset) throws IOException,
- InvalidRecordException {
- int data = bytes.length - offset;
- if (data <= 0) {
- throw new EOFException("No data at " + path);
+ public T fromBytes(String path, byte[] bytes, String marker)
+ throws IOException, NoRecordException, InvalidRecordException {
+ int len = bytes.length;
+ if (len == 0 ) {
+ throw new NoRecordException(path, E_NO_DATA);
+ }
+ if (StringUtils.isNotEmpty(marker) && len < marker.length()) {
+ throw new NoRecordException(path, E_DATA_TOO_SHORT);
+ }
+ String json = new String(bytes, 0, len, UTF_8);
+ if (StringUtils.isNotEmpty(marker)
+ && !json.contains(marker)) {
+ throw new NoRecordException(path, E_MISSING_MARKER_STRING + marker);
}
- String json = new String(bytes, offset, data, UTF_8);
try {
return fromJson(json);
} catch (JsonProcessingException e) {
@@ -266,52 +278,7 @@ public class JsonSerDeser<T> {
}
/**
- * Read from a byte array to a type, checking the header first
- * @param path source of data
- * @param buffer buffer
- * @return the parsed structure
- * Null if the record was too short or the header did not match
- * @throws IOException on a failure
- * @throws NoRecordException if header checks implied there was no record
- * @throws InvalidRecordException if record parsing failed
- */
- @SuppressWarnings("unchecked")
- public T fromBytesWithHeader(String path, byte[] buffer) throws IOException {
- int hlen = header.length;
- int blen = buffer.length;
- if (hlen > 0) {
- if (blen < hlen) {
- throw new NoRecordException(path, E_NO_SERVICE_RECORD);
- }
- byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
- if (!Arrays.equals(header, magic)) {
- LOG.debug("start of entry does not match service record header at {}",
- path);
- throw new NoRecordException(path, E_NO_SERVICE_RECORD);
- }
- }
- return fromBytes(path, buffer, hlen);
- }
-
- /**
- * Check if a buffer has a header which matches this record type
- * @param buffer buffer
- * @return true if there is a match
- * @throws IOException
- */
- public boolean headerMatches(byte[] buffer) throws IOException {
- int hlen = header.length;
- int blen = buffer.length;
- boolean matches = false;
- if (blen > hlen) {
- byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
- matches = Arrays.equals(header, magic);
- }
- return matches;
- }
-
- /**
- * Convert an object to a JSON string
+ * Convert an instance to a JSON string
* @param instance instance to convert
* @return a JSON string description
* @throws JsonParseException parse problems
@@ -324,4 +291,19 @@ public class JsonSerDeser<T> {
return mapper.writeValueAsString(instance);
}
+ /**
+ * Convert an instance to a string form for output. This is a robust
+ * operation which will convert any JSON-generating exceptions into
+ * error text.
+ * @param instance non-null instance
+ * @return a JSON string
+ */
+ public String toString(T instance) {
+ Preconditions.checkArgument(instance != null, "Null instance argument");
+ try {
+ return toJson(instance);
+ } catch (IOException e) {
+ return "Failed to convert to a string: " + e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
index b4254a3..ec59d59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
@@ -22,17 +22,19 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
-import org.apache.hadoop.registry.client.types.AddressTypes;
+import static org.apache.hadoop.registry.client.types.AddressTypes.*;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Static methods to work with registry types —primarily endpoints and the
@@ -94,79 +96,66 @@ public class RegistryTypeUtils {
Preconditions.checkArgument(protocolType != null, "null protocolType");
Preconditions.checkArgument(hostname != null, "null hostname");
return new Endpoint(api,
- AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
+ ADDRESS_HOSTNAME_AND_PORT,
protocolType,
- tuplelist(hostname, Integer.toString(port)));
+ hostnamePortPair(hostname, port));
}
/**
* Create an IPC endpoint
* @param api API
- * @param protobuf flag to indicate whether or not the IPC uses protocol
- * buffers
* @param address the address as a tuple of (hostname, port)
* @return the new endpoint
*/
- public static Endpoint ipcEndpoint(String api,
- boolean protobuf, List<String> address) {
- ArrayList<List<String>> addressList = new ArrayList<List<String>>();
- if (address != null) {
- addressList.add(address);
- }
+ public static Endpoint ipcEndpoint(String api, InetSocketAddress address) {
return new Endpoint(api,
- AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
- protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF
- : ProtocolTypes.PROTOCOL_HADOOP_IPC,
- addressList);
+ ADDRESS_HOSTNAME_AND_PORT,
+ ProtocolTypes.PROTOCOL_HADOOP_IPC,
+ address== null ? null: hostnamePortPair(address));
}
/**
- * Create a single-element list of tuples from the input.
- * that is, an input ("a","b","c") is converted into a list
- * in the form [["a","b","c"]]
- * @param t1 tuple elements
- * @return a list containing a single tuple
+ * Create a single entry map
+ * @param key map entry key
+ * @param val map entry value
+ * @return a 1 entry map.
*/
- public static List<List<String>> tuplelist(String... t1) {
- List<List<String>> outer = new ArrayList<List<String>>();
- outer.add(tuple(t1));
- return outer;
+ public static Map<String, String> map(String key, String val) {
+ Map<String, String> map = new HashMap<String, String>(1);
+ map.put(key, val);
+ return map;
}
/**
- * Create a tuples from the input.
- * that is, an input ("a","b","c") is converted into a list
- * in the form ["a","b","c"]
- * @param t1 tuple elements
- * @return a single tuple as a list
+ * Create a URI
+ * @param uri value
+ * @return a 1 entry map.
*/
- public static List<String> tuple(String... t1) {
- return Arrays.asList(t1);
+ public static Map<String, String> uri(String uri) {
+ return map(ADDRESS_URI, uri);
}
/**
- * Create a tuples from the input, converting all to Strings in the process
- * that is, an input ("a", 7, true) is converted into a list
- * in the form ["a","7,"true"]
- * @param t1 tuple elements
- * @return a single tuple as a list
+ * Create a (hostname, port) address pair
+ * @param hostname hostname
+ * @param port port
+ * @return a 1 entry map.
*/
- public static List<String> tuple(Object... t1) {
- List<String> l = new ArrayList<String>(t1.length);
- for (Object t : t1) {
- l.add(t.toString());
- }
- return l;
+ public static Map<String, String> hostnamePortPair(String hostname, int port) {
+ Map<String, String> map =
+ map(ADDRESS_HOSTNAME_FIELD, hostname);
+ map.put(ADDRESS_PORT_FIELD, Integer.toString(port));
+ return map;
}
/**
- * Convert a socket address pair into a string tuple, (host, port).
- * TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups.
- * @param address an address
- * @return an element for the address list
+ * Create a (hostname, port) address pair
+ * @param address socket address whose hostname and port are used for the
+ * generated address.
+ * @return a 1 entry map.
*/
- public static List<String> marshall(InetSocketAddress address) {
- return tuple(address.getHostName(), address.getPort());
+ public static Map<String, String> hostnamePortPair(InetSocketAddress address) {
+ return hostnamePortPair(address.getHostName(), address.getPort());
}
/**
@@ -199,25 +188,37 @@ public class RegistryTypeUtils {
if (epr == null) {
return null;
}
- requireAddressType(AddressTypes.ADDRESS_URI, epr);
- List<List<String>> addresses = epr.addresses;
+ requireAddressType(ADDRESS_URI, epr);
+ List<Map<String, String>> addresses = epr.addresses;
if (addresses.size() < 1) {
throw new InvalidRecordException(epr.toString(),
"No addresses in endpoint");
}
List<String> results = new ArrayList<String>(addresses.size());
- for (List<String> address : addresses) {
- if (address.size() != 1) {
- throw new InvalidRecordException(epr.toString(),
- "Address payload invalid: wrong element count: " +
- address.size());
- }
- results.add(address.get(0));
+ for (Map<String, String> address : addresses) {
+ results.add(getAddressField(address, ADDRESS_URI));
}
return results;
}
/**
+ * Get a specific field from an address -raising an exception if
+ * the field is not present
+ * @param address address to query
+ * @param field field to resolve
+ * @return the resolved value. Guaranteed to be non-null.
+ * @throws InvalidRecordException if the field did not resolve
+ */
+ public static String getAddressField(Map<String, String> address,
+ String field) throws InvalidRecordException {
+ String val = address.get(field);
+ if (val == null) {
+ throw new InvalidRecordException("", "Missing address field: " + field);
+ }
+ return val;
+ }
+
+ /**
* Get the address URLs. Guranteed to return at least one address.
* @param epr endpoint
* @return the address as a URL
@@ -237,4 +238,53 @@ public class RegistryTypeUtils {
}
return results;
}
+
+ /**
+ * Validate the record by checking for null fields and other invalid
+ * conditions
+ * @param path path for exceptions
+ * @param record record to validate. May be null
+ * @throws InvalidRecordException on invalid entries
+ */
+ public static void validateServiceRecord(String path, ServiceRecord record)
+ throws InvalidRecordException {
+ if (record == null) {
+ throw new InvalidRecordException(path, "Null record");
+ }
+ if (!ServiceRecord.RECORD_TYPE.equals(record.type)) {
+ throw new InvalidRecordException(path,
+ "invalid record type field: \"" + record.type + "\"");
+ }
+
+ if (record.external != null) {
+ for (Endpoint endpoint : record.external) {
+ validateEndpoint(path, endpoint);
+ }
+ }
+ if (record.internal != null) {
+ for (Endpoint endpoint : record.internal) {
+ validateEndpoint(path, endpoint);
+ }
+ }
+ }
+
+ /**
+ * Validate the endpoint by checking for null fields and other invalid
+ * conditions
+ * @param path path for exceptions
+ * @param endpoint endpoint to validate. May be null
+ * @throws InvalidRecordException on invalid entries
+ */
+ public static void validateEndpoint(String path, Endpoint endpoint)
+ throws InvalidRecordException {
+ if (endpoint == null) {
+ throw new InvalidRecordException(path, "Null endpoint");
+ }
+ try {
+ endpoint.validate();
+ } catch (RuntimeException e) {
+ throw new InvalidRecordException(path, e.toString());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
index 8caf400..68dc84e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -314,7 +313,7 @@ public class RegistryUtils {
Collection<RegistryPathStatus> stats) throws IOException {
Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
for (RegistryPathStatus stat : stats) {
- if (stat.size > ServiceRecordHeader.getLength()) {
+ if (stat.size > ServiceRecord.RECORD_TYPE.length()) {
// maybe has data
String path = join(parentpath, stat.path);
try {
@@ -344,7 +343,6 @@ public class RegistryUtils {
* <p>
* @param operations operation support for fetches
* @param parentpath path of the parent of all the entries
- * @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored.
*/
@@ -362,7 +360,6 @@ public class RegistryUtils {
* <p>
* @param operations operation support for fetches
* @param parentpath path of the parent of all the entries
- * @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored.
*/
@@ -382,7 +379,7 @@ public class RegistryUtils {
*/
public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
public ServiceRecordMarshal() {
- super(ServiceRecord.class, ServiceRecordHeader.getData());
+ super(ServiceRecord.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
index 160433f..b81b9d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
@@ -21,17 +21,11 @@ package org.apache.hadoop.registry.client.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
/**
* Raised if there is no {@link ServiceRecord} resolved at the end
- * of the specified path, for reasons such as:
- * <ul>
- * <li>There wasn't enough data to contain a Service Record.</li>
- * <li>The start of the data did not match the {@link ServiceRecordHeader}
- * header.</li>
- * </ul>
- *
+ * of the specified path.
+ * <p>
* There may be valid data of some form at the end of the path, but it does
* not appear to be a Service Record.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
index 7c01bdf..271ab25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.zookeeper.CreateMode;
@@ -103,10 +105,12 @@ public class RegistryOperationsService extends CuratorService
int flags) throws IOException {
Preconditions.checkArgument(record != null, "null record");
validatePath(path);
+ // validate the record before putting it
+ RegistryTypeUtils.validateServiceRecord(path, record);
LOG.info("Bound at {} : {}", path, record);
CreateMode mode = CreateMode.PERSISTENT;
- byte[] bytes = serviceRecordMarshal.toByteswithHeader(record);
+ byte[] bytes = serviceRecordMarshal.toBytes(record);
zkSet(path, mode, bytes, getClientAcls(),
((flags & BindFlags.OVERWRITE) != 0));
}
@@ -114,7 +118,11 @@ public class RegistryOperationsService extends CuratorService
@Override
public ServiceRecord resolve(String path) throws IOException {
byte[] bytes = zkRead(path);
- return serviceRecordMarshal.fromBytesWithHeader(path, bytes);
+
+ ServiceRecord record = serviceRecordMarshal.fromBytes(path,
+ bytes, ServiceRecord.RECORD_TYPE);
+ RegistryTypeUtils.validateServiceRecord(path, record);
+ return record;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
index 192819c..36dbf0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
@@ -38,6 +38,8 @@ public interface AddressTypes {
* </pre>
*/
public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
+ public static final String ADDRESS_HOSTNAME_FIELD = "host";
+ public static final String ADDRESS_PORT_FIELD = "port";
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
index 51418d9..e4effb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
@@ -21,14 +21,16 @@ package org.apache.hadoop.registry.client.types;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.binding.JsonSerDeser;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Description of a single service/component endpoint.
@@ -67,7 +69,7 @@ public final class Endpoint implements Cloneable {
/**
* a list of address tuples —tuples whose format depends on the address type
*/
- public List<List<String>> addresses;
+ public List<Map<String, String>> addresses;
/**
* Create an empty instance.
@@ -84,10 +86,11 @@ public final class Endpoint implements Cloneable {
this.api = that.api;
this.addressType = that.addressType;
this.protocolType = that.protocolType;
- this.addresses = new ArrayList<List<String>>(that.addresses.size());
- for (List<String> address : addresses) {
- List<String> addr2 = new ArrayList<String>(address.size());
- Collections.copy(address, addr2);
+ this.addresses = newAddresses(that.addresses.size());
+ for (Map<String, String> address : that.addresses) {
+ Map<String, String> addr2 = new HashMap<String, String>(address.size());
+ addr2.putAll(address);
+ addresses.add(addr2);
}
}
@@ -101,17 +104,83 @@ public final class Endpoint implements Cloneable {
public Endpoint(String api,
String addressType,
String protocolType,
- List<List<String>> addrs) {
+ List<Map<String, String>> addrs) {
this.api = api;
this.addressType = addressType;
this.protocolType = protocolType;
- this.addresses = new ArrayList<List<String>>();
+ this.addresses = newAddresses(0);
if (addrs != null) {
addresses.addAll(addrs);
}
}
/**
+ * Build an endpoint with an empty address list
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType) {
+ this.api = api;
+ this.addressType = addressType;
+ this.protocolType = protocolType;
+ this.addresses = newAddresses(0);
+ }
+
+ /**
+ * Build an endpoint with a single address entry.
+ * <p>
+ * This constructor is superfluous given the varags constructor is equivalent
+ * for a single element argument. However, type-erasure in java generics
+ * causes javac to warn about unchecked generic array creation. This
+ * constructor, which represents the common "one address" case, does
+ * not generate compile-time warnings.
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addr address. May be null —in which case it is not added
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ Map<String, String> addr) {
+ this(api, addressType, protocolType);
+ if (addr != null) {
+ addresses.add(addr);
+ }
+ }
+
+ /**
+ * Build an endpoint with a list of addresses
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addrs addresses. Null elements will be skipped
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ Map<String, String>...addrs) {
+ this(api, addressType, protocolType);
+ for (Map<String, String> addr : addrs) {
+ if (addr!=null) {
+ addresses.add(addr);
+ }
+ }
+ }
+
+ /**
+ * Create a new address structure of the requested size
+ * @param size size to create
+ * @return the new list
+ */
+ private List<Map<String, String>> newAddresses(int size) {
+ return new ArrayList<Map<String, String>>(size);
+ }
+
+ /**
* Build an endpoint from a list of URIs; each URI
* is ASCII-encoded and added to the list of addresses.
* @param api API name
@@ -125,40 +194,16 @@ public final class Endpoint implements Cloneable {
this.addressType = AddressTypes.ADDRESS_URI;
this.protocolType = protocolType;
- List<List<String>> addrs = new ArrayList<List<String>>(uris.length);
+ List<Map<String, String>> addrs = newAddresses(uris.length);
for (URI uri : uris) {
- addrs.add(RegistryTypeUtils.tuple(uri.toString()));
+ addrs.add(RegistryTypeUtils.uri(uri.toString()));
}
this.addresses = addrs;
}
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("Endpoint{");
- sb.append("api='").append(api).append('\'');
- sb.append(", addressType='").append(addressType).append('\'');
- sb.append(", protocolType='").append(protocolType).append('\'');
-
- sb.append(", addresses=");
- if (addresses != null) {
- sb.append("[ ");
- for (List<String> address : addresses) {
- sb.append("[ ");
- if (address == null) {
- sb.append("NULL entry in address list");
- } else {
- for (String elt : address) {
- sb.append('"').append(elt).append("\" ");
- }
- }
- sb.append("] ");
- };
- sb.append("] ");
- } else {
- sb.append("(null) ");
- }
- sb.append('}');
- return sb.toString();
+ return marshalToString.toString(this);
}
/**
@@ -173,7 +218,7 @@ public final class Endpoint implements Cloneable {
Preconditions.checkNotNull(addressType, "null addressType field");
Preconditions.checkNotNull(protocolType, "null protocolType field");
Preconditions.checkNotNull(addresses, "null addresses field");
- for (List<String> address : addresses) {
+ for (Map<String, String> address : addresses) {
Preconditions.checkNotNull(address, "null element in address");
}
}
@@ -184,7 +229,19 @@ public final class Endpoint implements Cloneable {
* @throws CloneNotSupportedException
*/
@Override
- protected Object clone() throws CloneNotSupportedException {
+ public Object clone() throws CloneNotSupportedException {
return super.clone();
}
+
+
+ /**
+ * Static instance of service record marshalling
+ */
+ private static class Marshal extends JsonSerDeser<Endpoint> {
+ private Marshal() {
+ super(Endpoint.class);
+ }
+ }
+
+ private static final Marshal marshalToString = new Marshal();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
index f225cf0..b836b00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
@@ -34,16 +34,11 @@ public interface ProtocolTypes {
String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
/**
- * Classic Hadoop IPC : {@value}.
+ * Hadoop IPC, "classic" or protobuf : {@value}.
*/
String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
/**
- * Hadoop protocol buffers IPC: {@value}.
- */
- String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
-
- /**
* Corba IIOP: {@value}.
*/
String PROTOCOL_IIOP = "IIOP";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
index 378127f..9403d31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.registry.client.types;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.codehaus.jackson.annotate.JsonAnyGetter;
import org.codehaus.jackson.annotate.JsonAnySetter;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -41,6 +42,17 @@ import java.util.Map;
public class ServiceRecord implements Cloneable {
/**
+ * A type string which MUST be in the serialized json. This permits
+ * fast discarding of invalid entries
+ */
+ public static final String RECORD_TYPE = "JSONServiceRecord";
+
+ /**
+ * The type field. This must be the string {@link #RECORD_TYPE}
+ */
+ public String type = RECORD_TYPE;
+
+ /**
* Description string
*/
public String description;
@@ -233,17 +245,5 @@ public class ServiceRecord implements Cloneable {
return super.clone();
}
- /**
- * Validate the record by checking for null fields and other invalid
- * conditions
- * @throws NullPointerException if a field is null when it
- * MUST be set.
- * @throws RuntimeException on invalid entries
- */
- public void validate() {
- for (Endpoint endpoint : external) {
- Preconditions.checkNotNull("null endpoint", endpoint);
- endpoint.validate();
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
deleted file mode 100644
index 2f75dba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.registry.client.types;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Service record header; access to the byte array kept private
- * to avoid findbugs warnings of mutability
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class ServiceRecordHeader {
- /**
- * Header of a service record: "jsonservicerec"
- * By making this over 12 bytes long, we can auto-determine which entries
- * in a listing are too short to contain a record without getting their data
- */
- private static final byte[] RECORD_HEADER = {
- 'j', 's', 'o', 'n',
- 's', 'e', 'r', 'v', 'i', 'c', 'e',
- 'r', 'e', 'c'
- };
-
- /**
- * Get the length of the record header
- * @return the header length
- */
- public static int getLength() {
- return RECORD_HEADER.length;
- }
-
- /**
- * Get a clone of the record header
- * @return the new record header.
- */
- public static byte[] getData() {
- byte[] h = new byte[RECORD_HEADER.length];
- System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
- return h;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
index 1c19ade..a950475 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
@@ -4,6 +4,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
(*
+============================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +20,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+============================================================================
*)
(*
@@ -71,13 +73,22 @@ CONSTANTS
MknodeActions \* all possible mkdir actions
+ASSUME PathChars \in STRING
+ASSUME Paths \in STRING
+
+(* Data in records is JSON, hence a string *)
+ASSUME Data \in STRING
+
+----------------------------------------------------------------------------------------
(* the registry*)
VARIABLE registry
+
(* Sequence of actions to apply to the registry *)
VARIABLE actions
+
----------------------------------------------------------------------------------------
(* Tuple of all variables. *)
@@ -92,7 +103,6 @@ vars == << registry, actions >>
(* Persistence policy *)
PersistPolicySet == {
- "", \* Undefined; field not present. PERMANENT is implied.
"permanent", \* persists until explicitly removed
"application", \* persists until the application finishes
"application-attempt", \* persists until the application attempt finishes
@@ -104,7 +114,6 @@ TypeInvariant ==
/\ \A p \in PersistPolicies: p \in PersistPolicySet
-
----------------------------------------------------------------------------------------
@@ -129,6 +138,14 @@ RegistryEntry == [
]
+(* Define the set of all string to string mappings *)
+
+StringMap == [
+ STRING |-> STRING
+]
+
+
+
(*
An endpoint in a service record
*)
@@ -140,21 +157,14 @@ Endpoint == [
addresses: Addresses
]
-(* Attributes are the set of all string to string mappings *)
-
-Attributes == [
-STRING |-> STRING
-]
-
(*
A service record
*)
ServiceRecord == [
- \* ID -used when applying the persistence policy
- yarn_id: STRING,
- \* the persistence policy
- yarn_persistence: PersistPolicySet,
+ \* This MUST be present: if it is not then the data is not a service record
+ \* This permits shortcut scan & reject of byte arrays without parsing
+ type: "JSONServiceRecord",
\*A description
description: STRING,
@@ -166,9 +176,34 @@ ServiceRecord == [
internal: Endpoints,
\* Attributes are a function
- attributes: Attributes
+ attributes: StringMap
]
+----------------------------------------------------------------------------------------
+
+(*
+ There is an operation serialize whose internals are not defined,
+ Which converts the service records to JSON
+ *)
+
+CONSTANT serialize(_)
+
+(* A function which returns true iff the byte stream is considered a valid service record. *)
+CONSTANT containsServiceRecord(_)
+
+(* A function to deserialize a string to JSON *)
+CONSTANT deserialize(_)
+
+ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN
+
+(* Records can be serialized *)
+ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r))
+
+(* All strings for which containsServiceRecord() holds can be deserialized *)
+ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord
+
+
+
----------------------------------------------------------------------------------------
@@ -304,8 +339,8 @@ validRegistry(R) ==
\* an entry must be the root entry or have a parent entry
/\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
- \* If the entry has data, it must be a service record
- /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords)
+ \* If the entry has data, it must contain a service record
+ /\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data))
----------------------------------------------------------------------------------------
@@ -336,13 +371,13 @@ mknode() adds a new empty entry where there was none before, iff
*)
mknodeSimple(R, path) ==
- LET record == [ path |-> path, data |-> <<>> ]
+ LET entry == [ path |-> path, data |-> <<>> ]
IN \/ exists(R, path)
- \/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} ))
+ \/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} ))
(*
-For all parents, the mknodeSimpl() criteria must apply.
+For all parents, the mknodeSimple() criteria must apply.
This could be defined recursively, though as TLA+ does not support recursion,
an alternative is required
@@ -350,7 +385,8 @@ an alternative is required
Because this specification is declaring the final state of a operation, not
the implemental, all that is needed is to describe those parents.
-It declares that the mkdirSimple state applies to the path and all its parents in the set R'
+It declares that the mknodeSimple() state applies to the path and all
+its parents in the set R'
*)
mknodeWithParents(R, path) ==
@@ -402,7 +438,7 @@ purge(R, path, id, persistence) ==
=> recursiveDelete(R, p2.path)
(*
-resolveRecord() resolves the record at a path or fails.
+resolveEntry() resolves the record entry at a path or fails.
It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
is guaranteed to return the single entry of that set, iff the choice predicate holds.
@@ -411,19 +447,28 @@ Using a predicate of TRUE, it always succeeds, so this function selects
the sole entry of the resolve operation.
*)
-resolveRecord(R, path) ==
+resolveEntry(R, path) ==
LET l == resolve(R, path) IN
/\ Cardinality(l) = 1
/\ CHOOSE e \in l : TRUE
(*
+ Resolve a record by resolving the entry and deserializing the result
+ *)
+resolveRecord(R, path) ==
+ deserialize(resolveEntry(R, path))
+
+
+(*
The specific action of putting an entry into a record includes validating the record
*)
validRecordToBind(path, record) ==
\* The root entry must have permanent persistence
- isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent"
- \/ record.attributes["yarn:persistence"] = "")
+ isRootPath(path) => (
+ record.attributes["yarn:persistence"] = "permanent"
+ \/ record.attributes["yarn:persistence"]
+ \/ record.attributes["yarn:persistence"] = {})
(*
@@ -432,13 +477,12 @@ marshalled as the data in the entry
*)
bindRecord(R, path, record) ==
/\ validRecordToBind(path, record)
- /\ bind(R, [path |-> path, data |-> record])
+ /\ bind(R, [path |-> path, data |-> serialize(record)])
----------------------------------------------------------------------------------------
-
(*
The action queue can only contain one of the sets of action types, and
by giving each a unique name, those sets are guaranteed to be disjoint
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
index 460ecad..91602e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.registry;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -46,11 +45,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.*;
/**
* This is a set of static methods to aid testing the registry operations.
@@ -61,18 +56,18 @@ public class RegistryTestHelper extends Assert {
public static final String SC_HADOOP = "org-apache-hadoop";
public static final String USER = "devteam/";
public static final String NAME = "hdfs";
- public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs";
- public static final String API_HDFS = "org_apache_hadoop_namenode_dfs";
+ public static final String API_WEBHDFS = "classpath:org.apache.hadoop.namenode.webhdfs";
+ public static final String API_HDFS = "classpath:org.apache.hadoop.namenode.dfs";
public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
public static final String ENTRY_PATH = PARENT_PATH + NAME;
- public static final String NNIPC = "nnipc";
- public static final String IPC2 = "IPC2";
+ public static final String NNIPC = "uuid:423C2B93-C927-4050-AEC6-6540E6646437";
+ public static final String IPC2 = "uuid:0663501D-5AD3-4F7E-9419-52F5D6636FCF";
private static final Logger LOG =
LoggerFactory.getLogger(RegistryTestHelper.class);
- public static final String KTUTIL = "ktutil";
private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
new RegistryUtils.ServiceRecordMarshal();
+ public static final String HTTP_API = "http://";
/**
* Assert the path is valid by ZK rules
@@ -148,9 +143,9 @@ public class RegistryTestHelper extends Assert {
assertEquals(API_WEBHDFS, webhdfs.api);
assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
- List<List<String>> addressList = webhdfs.addresses;
- List<String> url = addressList.get(0);
- String addr = url.get(0);
+ List<Map<String, String>> addressList = webhdfs.addresses;
+ Map<String, String> url = addressList.get(0);
+ String addr = url.get("uri");
assertTrue(addr.contains("http"));
assertTrue(addr.contains(":8020"));
@@ -159,8 +154,9 @@ public class RegistryTestHelper extends Assert {
nnipc.protocolType);
Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
+ assertNotNull(ipc2);
- Endpoint web = findEndpoint(record, "web", true, 1, 1);
+ Endpoint web = findEndpoint(record, HTTP_API, true, 1, 1);
assertEquals(1, web.addresses.size());
assertEquals(1, web.addresses.get(0).size());
}
@@ -275,14 +271,14 @@ public class RegistryTestHelper extends Assert {
public static void addSampleEndpoints(ServiceRecord entry, String hostname)
throws URISyntaxException {
assertNotNull(hostname);
- entry.addExternalEndpoint(webEndpoint("web",
+ entry.addExternalEndpoint(webEndpoint(HTTP_API,
new URI("http", hostname + ":80", "/")));
entry.addExternalEndpoint(
restEndpoint(API_WEBHDFS,
new URI("http", hostname + ":8020", "/")));
- Endpoint endpoint = ipcEndpoint(API_HDFS, true, null);
- endpoint.addresses.add(tuple(hostname, "8030"));
+ Endpoint endpoint = ipcEndpoint(API_HDFS, null);
+ endpoint.addresses.add(RegistryTypeUtils.hostnamePortPair(hostname, 8030));
entry.addInternalEndpoint(endpoint);
InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
entry.addInternalEndpoint(
@@ -290,9 +286,7 @@ public class RegistryTestHelper extends Assert {
8050));
entry.addInternalEndpoint(
RegistryTypeUtils.ipcEndpoint(
- IPC2,
- true,
- RegistryTypeUtils.marshall(localhost)));
+ IPC2, localhost));
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
index 14e3b1f..f1814d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java
@@ -19,9 +19,9 @@
package org.apache.hadoop.registry.client.binding;
import org.apache.hadoop.registry.RegistryTestHelper;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -31,8 +31,6 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.EOFException;
-
/**
* Test record marshalling
*/
@@ -44,6 +42,7 @@ public class TestMarshalling extends RegistryTestHelper {
public final Timeout testTimeout = new Timeout(10000);
@Rule
public TestName methodName = new TestName();
+
private static RegistryUtils.ServiceRecordMarshal marshal;
@BeforeClass
@@ -55,42 +54,65 @@ public class TestMarshalling extends RegistryTestHelper {
public void testRoundTrip() throws Throwable {
String persistence = PersistencePolicies.PERMANENT;
ServiceRecord record = createRecord(persistence);
- record.set("customkey","customvalue");
- record.set("customkey2","customvalue2");
+ record.set("customkey", "customvalue");
+ record.set("customkey2", "customvalue2");
+ RegistryTypeUtils.validateServiceRecord("", record);
LOG.info(marshal.toJson(record));
byte[] bytes = marshal.toBytes(record);
- ServiceRecord r2 = marshal.fromBytes("", bytes, 0);
+ ServiceRecord r2 = marshal.fromBytes("", bytes);
assertMatches(record, r2);
+ RegistryTypeUtils.validateServiceRecord("", r2);
}
- @Test
- public void testRoundTripHeaders() throws Throwable {
- ServiceRecord record = createRecord(PersistencePolicies.CONTAINER);
- byte[] bytes = marshal.toByteswithHeader(record);
- ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
- assertMatches(record, r2);
+ @Test(expected = NoRecordException.class)
+ public void testUnmarshallNoData() throws Throwable {
+ marshal.fromBytes("src", new byte[]{});
}
@Test(expected = NoRecordException.class)
- public void testRoundTripBadHeaders() throws Throwable {
- ServiceRecord record = createRecord(PersistencePolicies.APPLICATION);
- byte[] bytes = marshal.toByteswithHeader(record);
- bytes[1] = 0x01;
- marshal.fromBytesWithHeader("src", bytes);
+ public void testUnmarshallNotEnoughData() throws Throwable {
+ // this is nominally JSON -but without the service record header
+ marshal.fromBytes("src", new byte[]{'{','}'}, ServiceRecord.RECORD_TYPE);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnmarshallNoBody() throws Throwable {
+ byte[] bytes = "this is not valid JSON at all and should fail".getBytes();
+ marshal.fromBytes("src", bytes);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testUnmarshallWrongType() throws Throwable {
+ byte[] bytes = "{'type':''}".getBytes();
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling", bytes);
+ RegistryTypeUtils.validateServiceRecord("validating", serviceRecord);
}
@Test(expected = NoRecordException.class)
- public void testUnmarshallHeaderTooShort() throws Throwable {
- marshal.fromBytesWithHeader("src", new byte[]{'a'});
+ public void testUnmarshallWrongLongType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "ThisRecordHasALongButNonMatchingType";
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
+ bytes, ServiceRecord.RECORD_TYPE);
}
- @Test(expected = EOFException.class)
- public void testUnmarshallNoBody() throws Throwable {
- byte[] bytes = ServiceRecordHeader.getData();
- marshal.fromBytesWithHeader("src", bytes);
+ @Test(expected = NoRecordException.class)
+ public void testUnmarshallNoType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "NoRecord";
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
+ bytes, ServiceRecord.RECORD_TYPE);
}
+ @Test(expected = InvalidRecordException.class)
+ public void testRecordValidationWrongType() throws Throwable {
+ ServiceRecord record = new ServiceRecord();
+ record.type = "NotAServiceRecordType";
+ RegistryTypeUtils.validateServiceRecord("validating", record);
+ }
@Test
public void testUnknownFieldsRoundTrip() throws Throwable {
@@ -102,8 +124,8 @@ public class TestMarshalling extends RegistryTestHelper {
assertEquals("2", record.get("intval"));
assertNull(record.get("null"));
assertEquals("defval", record.get("null", "defval"));
- byte[] bytes = marshal.toByteswithHeader(record);
- ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
+ byte[] bytes = marshal.toBytes(record);
+ ServiceRecord r2 = marshal.fromBytes("", bytes);
assertEquals("value", r2.get("key"));
assertEquals("2", r2.get("intval"));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
index 7a7f88c..853d7f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.AbstractRegistryTest;
import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
@@ -91,10 +92,8 @@ public class TestRegistryOperations extends AbstractRegistryTest {
childStats.values());
assertEquals(1, records.size());
ServiceRecord record = records.get(ENTRY_PATH);
- assertNotNull(record);
- record.validate();
+ RegistryTypeUtils.validateServiceRecord(ENTRY_PATH, record);
assertMatches(written, record);
-
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e333584c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
index a2a5009..b38d9fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md
@@ -353,6 +353,10 @@ application.
<td>Description</td>
</tr>
<tr>
+ <td>type: String</td>
+ <td>Always: "JSONServiceRecord"</td>
+ </tr>
+ <tr>
<td>description: String</td>
<td>Human-readable description.</td>
</tr>
@@ -366,6 +370,8 @@ application.
</tr>
</table>
+The type field MUST be `"JSONServiceRecord"`. Mandating this string allows future record types *and* permits rapid rejection of byte arrays that lack this string before attempting JSON parsing.
+
### YARN Persistence policies
The YARN Resource Manager integration integrates cleanup of service records
@@ -379,7 +385,6 @@ any use of the registry without the RM's participation.
The attributes, `yarn:id` and `yarn:persistence` specify which records
*and any child entries* may be deleted as the associated YARN components complete.
-
The `yarn:id` field defines the application, attempt or container ID to match;
the `yarn:persistence` attribute defines the trigger for record cleanup, and
implicitly the type of the contents of the `yarn:id` field.
@@ -432,31 +437,32 @@ up according the lifecycle of that application.
<td>Description</td>
</tr>
<tr>
- <td>addresses: List[List[String]]</td>
- <td>a list of address tuples whose format depends on the address type</td>
- </tr>
- <tr>
- <td>addressType: String</td>
- <td>format of the binding</td>
- </tr>
+ <td>api: URI as String</td>
+ <td>API implemented at the end of the binding</td>
<tr>
<td>protocol: String</td>
<td>Protocol. Examples:
`http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ...</td>
</tr>
<tr>
- <td>api: String</td>
- <td>API implemented at the end of the binding</td>
+ <td>addressType: String</td>
+ <td>format of the binding</td>
</tr>
+ </tr>
+ <tr>
+ <td>addresses: List[Map[String, String]]</td>
+ <td>a list of address maps</td>
+ </tr>
+
</table>
All string fields have a limit on size, to dissuade services from hiding
complex JSON structures in the text description.
-### Field: Address Type
+#### Field `addressType`: Address Type
-The addressType field defines the string format of entries.
+The `addressType` field defines the string format of entries.
Having separate types is that tools (such as a web viewer) can process binding
strings without having to recognize the protocol.
@@ -467,43 +473,58 @@ strings without having to recognize the protocol.
<td>binding format</td>
</tr>
<tr>
- <td>`url`</td>
- <td>`[URL]`</td>
+ <td>uri</td>
+ <td>uri:URI of endpoint</td>
</tr>
<tr>
- <td>`hostname`</td>
- <td>`[hostname]`</td>
+ <td>hostname</td>
+ <td>hostname: service host</td>
</tr>
<tr>
- <td>`inetaddress`</td>
- <td>`[hostname, port]`</td>
+ <td>inetaddress</td>
+ <td>hostname: service host, port: service port</td>
</tr>
<tr>
- <td>`path`</td>
- <td>`[/path/to/something]`</td>
+ <td>path</td>
+ <td>path: generic unix filesystem path</td>
</tr>
<tr>
- <td>`zookeeper`</td>
- <td>`[quorum-entry, path]`</td>
+ <td>zookeeper</td>
+ <td>hostname: service host, port: service port, path: ZK path</td>
</tr>
</table>
-An actual zookeeper binding consists of a list of `hostname:port` bindings –the
-quorum— and the path within. In the proposed schema, every quorum entry will be
-listed as a triple of `[hostname, port, path]`. Client applications do not
-expect the path to de be different across the quorum. The first entry in the
-list of quorum hosts MUST define the path to be used by all clients. Later
-entries SHOULD list the same path, though clients MUST ignore these.
+In the zookeeper binding, every entry represents a single node in quorum,
+the `hostname` and `port` fields defining the hostname of the ZK instance
+and the port on which it is listening. The `path` field lists zookeeper path
+for applications to use. For example, for HBase this would refer to the znode
+containing information about the HBase cluster.
+
+The path MUST be identical across all address elements in the `addresses` list.
+This ensures that any single address contains enough information to connect
+to the quorum and connect to the relevant znode.
New Address types may be defined; if not standard please prefix with the
character sequence `"x-"`.
-#### **Field: API**
+### Field `api`: API identifier
+
+The API field MUST contain a URI that identifies the specific API of an endpoint.
+These MUST be unique to an API to avoid confusion.
+
+The following strategies are suggested to provide unique URIs for an API
+
+1. The SOAP/WS-* convention of using the URL to where the WSDL defining the service
+2. A URL to the svn/git hosted document defining a REST API
+3. the `classpath` schema followed by a path to a class or package in an application.
+4. The `uuid` schema with a generated UUID.
+
+It is hoped that standard API URIs will be defined for common APIs. Two such non-normative APIs are used in this document
+
+* `http://` : A web site for humans
+* `classpath:javax.management.jmx`: and endpoint supporting the JMX management protocol (RMI-based)
-APIs may be unique to a service class, or may be common across by service
-classes. They MUST be given unique names. These MAY be based on service
-packages but MAY be derived from other naming schemes:
### Examples of Service Entries
@@ -524,12 +545,14 @@ overall application. It exports the URL to a load balancer.
{
"description" : "tomcat-based web application",
- "registrationTime" : 1408638082444,
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
- "addresses" : [ [ "http://loadbalancer/" ] [ "http://loadbalancer2/" ] ]
+ "protocol" : "REST",
+ "addresses" : [
+ { "uri" : "http://loadbalancer/" },
+ { "uri" : "http://loadbalancer2/" }
+ ]
} ],
"internal" : [ ]
}
@@ -545,21 +568,23 @@ will trigger the deletion of this entry
/users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001
{
- "registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000001",
- "yarn:persistence" : "3",
- "description" : null,
+ "yarn:persistence" : "container",
+ "description" : "",
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
- "addresses" : [ [ "http://rack4server3:43572" ] ]
+ "protocol" : "REST",
+ "addresses" : [{ "uri" : "rack4server3:43572" } ]
} ],
"internal" : [ {
- "api" : "jmx",
+ "api" : "classpath:javax.management.jmx",
"addressType" : "host/port",
- "protocolType" : "JMX",
- "addresses" : [ [ "rack4server3", "43573" ] ]
+ "protocol" : "rmi",
+ "addresses" : [ {
+ "host" : "rack4server3",
+ "port" : "48551"
+ } ]
} ]
}
@@ -571,19 +596,22 @@ external endpoint, the JMX addresses as internal.
{
"registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000002",
- "yarn:persistence" : "3",
+ "yarn:persistence" : "container",
"description" : null,
"external" : [ {
- "api" : "www",
+ "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri",
- "protocolType" : "REST",
+ "protocol" : "REST",
"addresses" : [ [ "http://rack1server28:35881" ] ]
} ],
"internal" : [ {
- "api" : "jmx",
+ "api" : "classpath:javax.management.jmx",
"addressType" : "host/port",
- "protocolType" : "JMX",
- "addresses" : [ [ "rack1server28", "35882" ] ]
+ "protocol" : "rmi",
+ "addresses" : [ {
+ "host" : "rack1server28",
+ "port" : "48551"
+ } ]
} ]
}
@@ -887,3 +915,106 @@ Implementations may throttle update operations.
**Rate of Polling**
Clients which poll the registry may be throttled.
+
+# Complete service record example
+
+Below is a (non-normative) example of a service record retrieved
+from a YARN application.
+
+
+ {
+ "type" : "JSONServiceRecord",
+ "description" : "Slider Application Master",
+ "yarn:persistence" : "application",
+ "yarn:id" : "application_1414052463672_0028",
+ "external" : [ {
+ "api" : "classpath:org.apache.slider.appmaster",
+ "addressType" : "host/port",
+ "protocol" : "hadoop/IPC",
+ "addresses" : [ {
+ "port" : "48551",
+ "host" : "nn.example.com"
+ } ]
+ }, {
+ "api" : "http://",
+ "addressType" : "uri",
+ "protocol" : "web",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.management",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/mgmt"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.registry",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/registry"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher.configurations",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/slider"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.publisher.exports",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/exports"
+ } ]
+ } ],
+ "internal" : [ {
+ "api" : "classpath:org.apache.slider.agents.secure",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "https://nn.example.com:52705/ws/v1/slider/agents"
+ } ]
+ }, {
+ "api" : "classpath:org.apache.slider.agents.oneway",
+ "addressType" : "uri",
+ "protocol" : "REST",
+ "addresses" : [ {
+ "uri" : "https://nn.example.com:33425/ws/v1/slider/agents"
+ } ]
+ } ]
+ }
+
+It publishes a number of endpoints, both internal and external.
+
+External:
+
+1. The IPC hostname and port for client-AM communications
+1. URL to the AM's web UI
+1. A series of REST URLs under the web UI for specific application services.
+The details are irrelevant —note that they use an application-specific API
+value to ensure uniqueness.
+
+Internal:
+1. Two URLS to REST APIs offered by the AM for containers deployed by
+ the application itself.
+
+Python agents running in the containers retrieve the internal endpoint
+URLs to communicate with their AM. The record is resolved on container startup
+and cached until communications problems occur. At that point the registry is
+queried for the current record, then an attempt is made to reconnect to the AM.
+
+Here "connectivity" problems means both "low level socket/IO errors" and
+"failures in HTTPS authentication". The agents use two-way HTTPS authentication
+—if the AM fails and another application starts listening on the same ports
+it will trigger an authentication failure and hence service record reread.