You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/04/07 00:49:35 UTC

aurora git commit: Lift the standard `ServerSet` encoding.

Repository: aurora
Updated Branches:
  refs/heads/master 103dae687 -> 1a391d75f


Lift the standard `ServerSet` encoding.

This exposes the standard `ServerSet` `ServiceInstance` encoding to the
`ServerSet` interface for conforming implementations to leverage.

Bugs closed: AURORA-1468

Reviewed at https://reviews.apache.org/r/45829/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1a391d75
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1a391d75
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1a391d75

Branch: refs/heads/master
Commit: 1a391d75f4604bb8a017d53e78d943c057bc6784
Parents: 103dae6
Author: John Sirois <js...@apache.org>
Authored: Wed Apr 6 16:49:32 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Wed Apr 6 16:49:32 2016 -0600

----------------------------------------------------------------------
 .../aurora/common/zookeeper/ServerSet.java      |  90 +++++++++++++++
 .../aurora/common/zookeeper/ServerSetImpl.java  | 114 +------------------
 .../common/zookeeper/ServerSetImplTest.java     |  49 --------
 .../aurora/common/zookeeper/ServerSetTest.java  |  78 +++++++++++++
 .../aurora/common/zookeeper/ServerSetsTest.java |   7 +-
 5 files changed, 172 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
index 6e32083..2d978c1 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -13,16 +13,105 @@
  */
 package org.apache.aurora.common.zookeeper;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
 import org.apache.aurora.common.zookeeper.Group.JoinException;
 
 /**
  * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
  * common service to advertise their presence to server-set protocol-aware clients.
+ *
+ * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
+ * rendezvous data to zookeeper so that standard clients can interoperate.
  */
 public interface ServerSet {
+
+  /**
+   * Encodes a {@link ServiceInstance} as a JSON object.
+   *
+   * This is the default encoding for service instance data in ZooKeeper.
+   */
+  Codec<ServiceInstance> JSON_CODEC = new Codec<ServiceInstance>() {
+    class EndpointSchema {
+      private final String host;
+      private final int port;
+
+      EndpointSchema(Endpoint endpoint) {
+        host = endpoint.getHost();
+        port = endpoint.getPort();
+      }
+
+      Endpoint asEndpoint() {
+        return new Endpoint(host, port);
+      }
+    }
+
+    class ServiceInstanceSchema {
+      private final EndpointSchema serviceEndpoint;
+      private final Map<String, EndpointSchema> additionalEndpoints;
+      private final Status status;
+      private final @Nullable Integer shard;
+
+      ServiceInstanceSchema(ServiceInstance instance) {
+        serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+        if (instance.getAdditionalEndpoints() != null) {
+          additionalEndpoints =
+              Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
+        } else {
+          additionalEndpoints = Maps.newHashMap();
+        }
+        status  = instance.getStatus();
+        shard = instance.isSetShard() ? instance.getShard() : null;
+      }
+
+      ServiceInstance asServiceInstance() {
+        ServiceInstance instance =
+            new ServiceInstance(
+                serviceEndpoint.asEndpoint(),
+                Maps.transformValues(additionalEndpoints, EndpointSchema::asEndpoint),
+                status);
+        if (shard != null) {
+          instance.setShard(shard);
+        }
+        return instance;
+      }
+    }
+
+    private final Charset encoding = Charsets.UTF_8;
+    private final Gson gson = new Gson();
+
+    @Override
+    public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+      Writer writer = new OutputStreamWriter(sink, encoding);
+      gson.toJson(new ServiceInstanceSchema(instance), writer);
+      writer.flush();
+    }
+
+    @Override
+    public ServiceInstance deserialize(InputStream source) throws IOException {
+      InputStreamReader reader = new InputStreamReader(source, encoding);
+      return gson.fromJson(reader, ServiceInstanceSchema.class).asServiceInstance();
+    }
+  };
+
   /**
    * Attempts to join a server set for this logical service group.
    *
@@ -41,6 +130,7 @@ public interface ServerSet {
    * A handle to a service endpoint's status data that allows updating it to track current events.
    */
   interface EndpointStatus {
+
     /**
      * Removes the endpoint from the server set.
      *

http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
index 8b385b8..ace4980 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -14,30 +14,21 @@
 package org.apache.aurora.common.zookeeper;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
 import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.Set;
 
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
@@ -45,12 +36,10 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
 import com.google.common.util.concurrent.UncheckedExecutionException;
-import com.google.gson.Gson;
 
 import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
 import org.apache.aurora.common.util.BackoffHelper;
@@ -93,7 +82,7 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance>
    * @param path the name-service path of the service to connect to
    */
   public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, new Group(zkClient, acl, path), createCodec());
+    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
   }
 
   /**
@@ -103,7 +92,7 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance>
    * @param group the server group
    */
   public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
-    this(zkClient, group, createCodec());
+    this(zkClient, group, JSON_CODEC);
   }
 
   /**
@@ -357,103 +346,4 @@ public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance>
       LOG.info(message.toString());
     }
   }
-
-  private static class EndpointSchema {
-    final String host;
-    final Integer port;
-
-    EndpointSchema(Endpoint endpoint) {
-      Preconditions.checkNotNull(endpoint);
-      this.host = endpoint.getHost();
-      this.port = endpoint.getPort();
-    }
-
-    String getHost() {
-      return host;
-    }
-
-    Integer getPort() {
-      return port;
-    }
-  }
-
-  private static class ServiceInstanceSchema {
-    final EndpointSchema serviceEndpoint;
-    final Map<String, EndpointSchema> additionalEndpoints;
-    final Status status;
-    final Integer shard;
-
-    ServiceInstanceSchema(ServiceInstance instance) {
-      this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
-      if (instance.getAdditionalEndpoints() != null) {
-        this.additionalEndpoints = Maps.transformValues(
-            instance.getAdditionalEndpoints(),
-            EndpointSchema::new
-        );
-      } else {
-        this.additionalEndpoints = Maps.newHashMap();
-      }
-      this.status  = instance.getStatus();
-      this.shard = instance.isSetShard() ? instance.getShard() : null;
-    }
-
-    EndpointSchema getServiceEndpoint() {
-      return serviceEndpoint;
-    }
-
-    Map<String, EndpointSchema> getAdditionalEndpoints() {
-      return additionalEndpoints;
-    }
-
-    Status getStatus() {
-      return status;
-    }
-
-    Integer getShard() {
-      return shard;
-    }
-  }
-
-  /**
-   * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the
-   * __isset_bit_vector internal thrift struct field that tracks primitive types.
-   */
-  private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
-    private static final Charset ENCODING = Charsets.UTF_8;
-    private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class;
-    private final Gson gson = new Gson();
-
-    @Override
-    public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
-      Writer w = new OutputStreamWriter(sink, ENCODING);
-      gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
-      w.flush();
-    }
-
-    @Override
-    public ServiceInstance deserialize(InputStream source) throws IOException {
-      ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS);
-      Endpoint primary = new Endpoint(
-          output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort());
-      Map<String, Endpoint> additional = Maps.transformValues(
-          output.getAdditionalEndpoints(),
-          endpoint -> new Endpoint(endpoint.getHost(), endpoint.getPort())
-      );
-      ServiceInstance instance =
-          new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus());
-      if (output.getShard() != null) {
-        instance.setShard(output.getShard());
-      }
-      return instance;
-    }
-  }
-
-  /**
-   * Returns a codec for {@link ServiceInstance} objects that translates to and from JSON.
-   *
-   * @return a new codec instance.
-   */
-  public static Codec<ServiceInstance> createCodec() {
-    return new AdaptedJsonCodec();
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
index 37be70b..73049d8 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.common.zookeeper;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -26,7 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.net.pool.DynamicHostSet;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
@@ -44,7 +42,6 @@ import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -194,52 +191,6 @@ public class ServerSetImplTest extends BaseZooKeeperTest {
   }
 
   @Test
-  public void testJsonCodecRoundtrip() throws Exception {
-    Codec<ServiceInstance> codec = ServerSetImpl.createCodec();
-    ServiceInstance instance1 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http", new Endpoint("foo", 8080)),
-        Status.ALIVE)
-        .setShard(0);
-    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
-
-    ServiceInstance instance2 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
-        Status.ALIVE);
-    data = ServerSets.serializeServiceInstance(instance2, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
-
-    ServiceInstance instance3 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.<String, Endpoint>of(),
-        Status.ALIVE);
-    data = ServerSets.serializeServiceInstance(instance3, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
-  }
-
-  @Test
-  public void testJsonCompatibility() throws IOException {
-    ServiceInstance instance = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http", new Endpoint("foo", 8080)),
-        Status.ALIVE).setShard(42);
-
-    ByteArrayOutputStream results = new ByteArrayOutputStream();
-    ServerSetImpl.createCodec().serialize(instance, results);
-    assertEquals(
-        "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
-            + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
-            + "\"status\":\"ALIVE\","
-            + "\"shard\":42}",
-        results.toString());
-  }
-
-  @Test
   public void testUnwatchOnException() throws Exception {
     IMocksControl control = createControl();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java
new file mode 100644
index 0000000..b48c1f1
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ServerSetTest {
+
+  @Test
+  public void testJsonCodecRoundtrip() throws Exception {
+    Codec<ServiceInstance> codec = ServerSet.JSON_CODEC;
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE)
+        .setShard(0);
+    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance2, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.<String, Endpoint>of(),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance3, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+  }
+
+  @Test
+  public void testJsonCompatibility() throws IOException {
+    ServiceInstance instance = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE).setShard(42);
+
+    ByteArrayOutputStream results = new ByteArrayOutputStream();
+    ServerSet.JSON_CODEC.serialize(instance, results);
+    assertEquals(
+        "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+            + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+            + "\"status\":\"ALIVE\","
+            + "\"shard\":42}",
+        results.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/1a391d75/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
index 85b89d5..0e67191 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
@@ -18,7 +18,6 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
@@ -33,12 +32,10 @@ public class ServerSetsTest {
     Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
     Status status = Status.ALIVE;
 
-    Codec<ServiceInstance> codec = ServerSetImpl.createCodec();
-
     byte[] data = ServerSets.serializeServiceInstance(
-        endpoint, additionalEndpoints, status, codec);
+        endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
 
-    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, codec);
+    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
 
     assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
     assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());