You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/04/07 00:49:35 UTC
aurora git commit: Lift the standard `ServerSet` encoding.
Repository: aurora
Updated Branches:
refs/heads/master 103dae687 -> 1a391d75f
Lift the standard `ServerSet` encoding.
This exposes the standard `ServerSet` `ServiceInstance` encoding to the
`ServerSet` interface for conforming implementations to leverage.
Bugs closed: AURORA-1468
Reviewed at https://reviews.apache.org/r/45829/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1a391d75
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1a391d75
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1a391d75
Branch: refs/heads/master
Commit: 1a391d75f4604bb8a017d53e78d943c057bc6784
Parents: 103dae6
Author: John Sirois <js...@apache.org>
Authored: Wed Apr 6 16:49:32 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Wed Apr 6 16:49:32 2016 -0600
----------------------------------------------------------------------
.../aurora/common/zookeeper/ServerSet.java | 90 +++++++++++++++
.../aurora/common/zookeeper/ServerSetImpl.java | 114 +------------------
.../common/zookeeper/ServerSetImplTest.java | 49 --------
.../aurora/common/zookeeper/ServerSetTest.java | 78 +++++++++++++
.../aurora/common/zookeeper/ServerSetsTest.java | 7 +-
5 files changed, 172 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
index 6e32083..2d978c1 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -13,16 +13,105 @@
*/
package org.apache.aurora.common.zookeeper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
import java.util.Map;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
import org.apache.aurora.common.zookeeper.Group.JoinException;
/**
* A logical set of servers registered in ZooKeeper. Intended to be used by servers in a
* common service to advertise their presence to server-set protocol-aware clients.
+ *
+ * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
+ * rendezvous data to zookeeper so that standard clients can interoperate.
*/
public interface ServerSet {
+
+ /**
+ * Encodes a {@link ServiceInstance} as a JSON object.
+ *
+ * This is the default encoding for service instance data in ZooKeeper.
+ */
+ Codec<ServiceInstance> JSON_CODEC = new Codec<ServiceInstance>() {
+ class EndpointSchema {
+ private final String host;
+ private final int port;
+
+ EndpointSchema(Endpoint endpoint) {
+ host = endpoint.getHost();
+ port = endpoint.getPort();
+ }
+
+ Endpoint asEndpoint() {
+ return new Endpoint(host, port);
+ }
+ }
+
+ class ServiceInstanceSchema {
+ private final EndpointSchema serviceEndpoint;
+ private final Map<String, EndpointSchema> additionalEndpoints;
+ private final Status status;
+ private final @Nullable Integer shard;
+
+ ServiceInstanceSchema(ServiceInstance instance) {
+ serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+ if (instance.getAdditionalEndpoints() != null) {
+ additionalEndpoints =
+ Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
+ } else {
+ additionalEndpoints = Maps.newHashMap();
+ }
+ status = instance.getStatus();
+ shard = instance.isSetShard() ? instance.getShard() : null;
+ }
+
+ ServiceInstance asServiceInstance() {
+ ServiceInstance instance =
+ new ServiceInstance(
+ serviceEndpoint.asEndpoint(),
+ Maps.transformValues(additionalEndpoints, EndpointSchema::asEndpoint),
+ status);
+ if (shard != null) {
+ instance.setShard(shard);
+ }
+ return instance;
+ }
+ }
+
+ private final Charset encoding = Charsets.UTF_8;
+ private final Gson gson = new Gson();
+
+ @Override
+ public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+ Writer writer = new OutputStreamWriter(sink, encoding);
+ gson.toJson(new ServiceInstanceSchema(instance), writer);
+ writer.flush();
+ }
+
+ @Override
+ public ServiceInstance deserialize(InputStream source) throws IOException {
+ InputStreamReader reader = new InputStreamReader(source, encoding);
+ return gson.fromJson(reader, ServiceInstanceSchema.class).asServiceInstance();
+ }
+ };
+
/**
* Attempts to join a server set for this logical service group.
*
@@ -41,6 +130,7 @@ public interface ServerSet {
* A handle to a service endpoint's status data that allows updating it to track current events.
*/
interface EndpointStatus {
+
/**
* Removes the endpoint from the server set.
*
http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
index 8b385b8..ace4980 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -14,30 +14,21 @@
package org.apache.aurora.common.zookeeper;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
@@ -45,12 +36,10 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.google.common.util.concurrent.UncheckedExecutionException;
-import com.google.gson.Gson;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.thrift.Status;
import org.apache.aurora.common.util.BackoffHelper;
@@ -93,7 +82,7 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance>
* @param path the name-service path of the service to connect to
*/
public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
- this(zkClient, new Group(zkClient, acl, path), createCodec());
+ this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
}
/**
@@ -103,7 +92,7 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance>
* @param group the server group
*/
public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
- this(zkClient, group, createCodec());
+ this(zkClient, group, JSON_CODEC);
}
/**
@@ -357,103 +346,4 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance>
LOG.info(message.toString());
}
}
-
- private static class EndpointSchema {
- final String host;
- final Integer port;
-
- EndpointSchema(Endpoint endpoint) {
- Preconditions.checkNotNull(endpoint);
- this.host = endpoint.getHost();
- this.port = endpoint.getPort();
- }
-
- String getHost() {
- return host;
- }
-
- Integer getPort() {
- return port;
- }
- }
-
- private static class ServiceInstanceSchema {
- final EndpointSchema serviceEndpoint;
- final Map<String, EndpointSchema> additionalEndpoints;
- final Status status;
- final Integer shard;
-
- ServiceInstanceSchema(ServiceInstance instance) {
- this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
- if (instance.getAdditionalEndpoints() != null) {
- this.additionalEndpoints = Maps.transformValues(
- instance.getAdditionalEndpoints(),
- EndpointSchema::new
- );
- } else {
- this.additionalEndpoints = Maps.newHashMap();
- }
- this.status = instance.getStatus();
- this.shard = instance.isSetShard() ? instance.getShard() : null;
- }
-
- EndpointSchema getServiceEndpoint() {
- return serviceEndpoint;
- }
-
- Map<String, EndpointSchema> getAdditionalEndpoints() {
- return additionalEndpoints;
- }
-
- Status getStatus() {
- return status;
- }
-
- Integer getShard() {
- return shard;
- }
- }
-
- /**
- * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the
- * __isset_bit_vector internal thrift struct field that tracks primitive types.
- */
- private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
- private static final Charset ENCODING = Charsets.UTF_8;
- private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class;
- private final Gson gson = new Gson();
-
- @Override
- public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
- Writer w = new OutputStreamWriter(sink, ENCODING);
- gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
- w.flush();
- }
-
- @Override
- public ServiceInstance deserialize(InputStream source) throws IOException {
- ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS);
- Endpoint primary = new Endpoint(
- output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort());
- Map<String, Endpoint> additional = Maps.transformValues(
- output.getAdditionalEndpoints(),
- endpoint -> new Endpoint(endpoint.getHost(), endpoint.getPort())
- );
- ServiceInstance instance =
- new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus());
- if (output.getShard() != null) {
- instance.setShard(output.getShard());
- }
- return instance;
- }
- }
-
- /**
- * Returns a codec for {@link ServiceInstance} objects that translates to and from JSON.
- *
- * @return a new codec instance.
- */
- public static Codec<ServiceInstance> createCodec() {
- return new AdaptedJsonCodec();
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
index 37be70b..73049d8 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.common.zookeeper;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
@@ -26,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.net.pool.DynamicHostSet;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
@@ -44,7 +42,6 @@ import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -194,52 +191,6 @@ public class ServerSetImplTest extends BaseZooKeeperTest {
}
@Test
- public void testJsonCodecRoundtrip() throws Exception {
- Codec<ServiceInstance> codec = ServerSetImpl.createCodec();
- ServiceInstance instance1 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http", new Endpoint("foo", 8080)),
- Status.ALIVE)
- .setShard(0);
- byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
-
- ServiceInstance instance2 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
- Status.ALIVE);
- data = ServerSets.serializeServiceInstance(instance2, codec);
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
- assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
-
- ServiceInstance instance3 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.<String, Endpoint>of(),
- Status.ALIVE);
- data = ServerSets.serializeServiceInstance(instance3, codec);
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
- assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
- }
-
- @Test
- public void testJsonCompatibility() throws IOException {
- ServiceInstance instance = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http", new Endpoint("foo", 8080)),
- Status.ALIVE).setShard(42);
-
- ByteArrayOutputStream results = new ByteArrayOutputStream();
- ServerSetImpl.createCodec().serialize(instance, results);
- assertEquals(
- "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
- + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
- + "\"status\":\"ALIVE\","
- + "\"shard\":42}",
- results.toString());
- }
-
- @Test
public void testUnwatchOnException() throws Exception {
IMocksControl control = createControl();
http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java
new file mode 100644
index 0000000..b48c1f1
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ServerSetTest {
+
+ @Test
+ public void testJsonCodecRoundtrip() throws Exception {
+ Codec<ServiceInstance> codec = ServerSet.JSON_CODEC;
+ ServiceInstance instance1 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http", new Endpoint("foo", 8080)),
+ Status.ALIVE)
+ .setShard(0);
+ byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+ ServiceInstance instance2 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+ data = ServerSets.serializeServiceInstance(instance2, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+ ServiceInstance instance3 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.<String, Endpoint>of(),
+ Status.ALIVE);
+ data = ServerSets.serializeServiceInstance(instance3, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+ }
+
+ @Test
+ public void testJsonCompatibility() throws IOException {
+ ServiceInstance instance = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http", new Endpoint("foo", 8080)),
+ Status.ALIVE).setShard(42);
+
+ ByteArrayOutputStream results = new ByteArrayOutputStream();
+ ServerSet.JSON_CODEC.serialize(instance, results);
+ assertEquals(
+ "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+ + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+ + "\"status\":\"ALIVE\","
+ + "\"shard\":42}",
+ results.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
index 85b89d5..0e67191 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
@@ -18,7 +18,6 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
-import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.thrift.Status;
@@ -33,12 +32,10 @@ public class ServerSetsTest {
Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
Status status = Status.ALIVE;
- Codec<ServiceInstance> codec = ServerSetImpl.createCodec();
-
byte[] data = ServerSets.serializeServiceInstance(
- endpoint, additionalEndpoints, status, codec);
+ endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
- ServiceInstance instance = ServerSets.deserializeServiceInstance(data, codec);
+ ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());