You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:34 UTC

[37/50] [abbrv] git commit: (TWILL-53) Expose DiscoveryServiceClient through TwillContext to allow TwillRunnable able to discover services announced within the same TwillApplication.

(TWILL-53) Expose DiscoveryServiceClient through TwillContext to allow TwillRunnable able to discover services announced within the same TwillApplication.

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/875fbca9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/875fbca9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/875fbca9

Branch: refs/heads/site
Commit: 875fbca941a8e9a24d29d934a9c550c25ed722d9
Parents: 3d3c0e9
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Mar 11 16:45:17 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Wed Mar 12 13:14:16 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/twill/api/TwillContext.java |  14 +-
 .../twill/internal/BasicTwillContext.java       |  12 +-
 .../twill/discovery/DiscoveryServiceClient.java |   4 +-
 .../internal/container/TwillContainerMain.java  |   5 +-
 .../apache/twill/yarn/ServiceDiscoveryTest.java | 129 +++++++++++++++++++
 .../org/apache/twill/yarn/YarnTestSuite.java    |   3 +-
 6 files changed, 159 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/875fbca9/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillContext.java b/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
index b4ddb6e..f7a7ac1 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillContext.java
@@ -17,12 +17,15 @@
  */
 package org.apache.twill.api;
 
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ServiceDiscovered;
+
 import java.net.InetAddress;
 
 /**
  * Represents the runtime context of a {@link TwillRunnable}.
  */
-public interface TwillContext extends ServiceAnnouncer {
+public interface TwillContext extends ServiceAnnouncer, DiscoveryServiceClient {
 
   /**
    * Returns the {@link RunId} of this running instance of {@link TwillRunnable}.
@@ -73,4 +76,13 @@ public interface TwillContext extends ServiceAnnouncer {
    * Returns the amount of memory in MB the runnable is allowed to use.
    */
   int getMaxMemoryMB();
+
+  /**
+   * Discover service with the given name that is announced within the same {@link TwillApplication}.
+   *
+   * @param name Name of the service
+   * @return A {@link ServiceDiscovered} object representing the result.
+   */
+  @Override
+  ServiceDiscovered discover(String name);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/875fbca9/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
index 61bdaef..4a503e0 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -23,6 +23,8 @@ import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.discovery.Discoverable;
 import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ServiceDiscovered;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -40,12 +42,14 @@ public final class BasicTwillContext implements TwillContext {
   private final TwillRunnableSpecification spec;
   private final int instanceId;
   private final DiscoveryService discoveryService;
+  private final DiscoveryServiceClient discoveryServiceClient;
   private final int allowedMemoryMB;
   private final int virtualCores;
   private volatile int instanceCount;
 
   public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
-                           TwillRunnableSpecification spec, int instanceId, DiscoveryService discoveryService,
+                           TwillRunnableSpecification spec, int instanceId,
+                           DiscoveryService discoveryService, DiscoveryServiceClient discoveryServiceClient,
                            int instanceCount, int allowedMemoryMB, int virtualCores) {
     this.runId = runId;
     this.appRunId = appRunId;
@@ -55,6 +59,7 @@ public final class BasicTwillContext implements TwillContext {
     this.spec = spec;
     this.instanceId = instanceId;
     this.discoveryService = discoveryService;
+    this.discoveryServiceClient = discoveryServiceClient;
     this.instanceCount = instanceCount;
     this.allowedMemoryMB = allowedMemoryMB;
     this.virtualCores = virtualCores;
@@ -128,4 +133,9 @@ public final class BasicTwillContext implements TwillContext {
       }
     });
   }
+
+  @Override
+  public ServiceDiscovered discover(String name) {
+    return discoveryServiceClient.discover(name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/875fbca9/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
index a58c83d..1298a16 100644
--- a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -23,10 +23,10 @@ package org.apache.twill.discovery;
 public interface DiscoveryServiceClient {
 
   /**
-   * Retrieves a list of {@link Discoverable} for the a service with the given name.
+   * Discover service with the given name.
    *
    * @param name Name of the service
-   * @return A {@link org.apache.twill.discovery.ServiceDiscovered} object representing the result.
+   * @return A {@link ServiceDiscovered} object representing the result.
    */
   ServiceDiscovered discover(String name);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/875fbca9/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 072bbb0..c3aece6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -31,7 +31,6 @@ import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.Arguments;
 import org.apache.twill.internal.BasicTwillContext;
@@ -86,7 +85,7 @@ public final class TwillContainerMain extends ServiceMain {
         ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
                                  RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
 
-    DiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
+    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
 
     TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
     renameLocalFiles(twillSpec.getRunnables().get(runnableName));
@@ -98,7 +97,7 @@ public final class TwillContainerMain extends ServiceMain {
       runId, appRunId, containerInfo.getHost(),
       arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
       arguments.getArguments().toArray(new String[0]),
-      runnableSpec, instanceId, discoveryService, instanceCount,
+      runnableSpec, instanceId, discoveryService, discoveryService, instanceCount,
       containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
     );
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/875fbca9/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTest.java
new file mode 100644
index 0000000..77bc181
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Services;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for ability to discover existence of services through {@link TwillContext}.
+ */
+public final class ServiceDiscoveryTest extends BaseYarnTest {
+
+  @Test
+  public void testServiceDiscovery() throws InterruptedException, ExecutionException, TimeoutException {
+    TwillRunner twillRunner = YarnTestUtils.getTwillRunner();
+    TwillController controller = twillRunner
+      .prepare(new ServiceApplication())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withArguments("r1", "12345")
+      .withArguments("r2", "45678")
+      .start();
+
+    ListenableFuture<Service.State> completion = Services.getCompletionFuture(controller);
+    try {
+      completion.get(60, TimeUnit.SECONDS);
+    } finally {
+      controller.stopAndWait();
+    }
+  }
+
+  /**
+   * An application that contains two {@link ServiceRunnable}.
+   */
+  public static final class ServiceApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("ServiceApp")
+        .withRunnable()
+          .add("r1", new ServiceRunnable()).noLocalFiles()
+          .add("r2", new ServiceRunnable()).noLocalFiles()
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * A Runnable that will announce on service and wait for announcement from another instance in the same service.
+   */
+  public static final class ServiceRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class);
+    private static final String SERVICE_NAME = "service";
+    private volatile Thread runThread;
+
+    @Override
+    public void run() {
+      this.runThread = Thread.currentThread();
+      final int port = Integer.parseInt(getContext().getArguments()[0]);
+      getContext().announce(SERVICE_NAME, port);
+
+      final CountDownLatch discoveredLatch = new CountDownLatch(1);
+
+      ServiceDiscovered serviceDiscovered = getContext().discover(SERVICE_NAME);
+      serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+        @Override
+        public void onChange(ServiceDiscovered serviceDiscovered) {
+          // Try to find a discoverable that is not this instance
+          for (Discoverable discoverable : serviceDiscovered) {
+            int discoveredPort = discoverable.getSocketAddress().getPort();
+            if (SERVICE_NAME.equals(discoverable.getName()) && discoveredPort != port) {
+              LOG.info("{}: Service discovered at {}", getContext().getSpecification().getName(), discoveredPort);
+              discoveredLatch.countDown();
+            }
+          }
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+
+      try {
+        discoveredLatch.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted.", e);
+      }
+    }
+
+    @Override
+    public void stop() {
+      if (runThread != null) {
+        runThread.interrupt();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/875fbca9/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 56fb6a5..51b6abf 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -33,7 +33,8 @@ import org.junit.runners.Suite;
                       FailureRestartTestRun.class,
                       ProvisionTimeoutTestRun.class,
                       LogHandlerTestRun.class,
-                      SessionExpireTestRun.class
+                      SessionExpireTestRun.class,
+                      ServiceDiscoveryTest.class
                     })
 public final class YarnTestSuite {