You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:57 UTC
[15/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-core/src/test/resources/logback-test.xml b/twill-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..3c36660
--- /dev/null
+++ b/twill-core/src/test/resources/logback-test.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
new file mode 100644
index 0000000..e41b214
--- /dev/null
+++ b/twill-discovery-api/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-discovery-api</artifactId>
+ <name>Twill discovery service API</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
new file mode 100644
index 0000000..a5529fe
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Discoverable defines the attributes of service to be discovered.
+ */
+public interface Discoverable {
+
+ /**
+ * @return Name of the service
+ */
+ String getName();
+
+ /**
+ * @return An {@link InetSocketAddress} representing the host+port of the service.
+ */
+ InetSocketAddress getSocketAddress();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
new file mode 100644
index 0000000..a26fff8
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * DiscoveryService defines interface for registering {@link Discoverable}.
+ */
+public interface DiscoveryService {
+
+ /**
+ * Registers a {@link Discoverable} service.
+ * @param discoverable Information of the service provider that could be discovered.
+ * @return A {@link Cancellable} for un-registration.
+ */
+ Cancellable register(Discoverable discoverable);
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
new file mode 100644
index 0000000..89cf269
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+/**
+ * Interface for {@link DiscoveryServiceClient} to discover services registered with {@link DiscoveryService}.
+ */
+public interface DiscoveryServiceClient {
+
+ /**
+ * Retrieves a list of {@link Discoverable} for the a service with the given name.
+ *
+ * @param name Name of the service
+ * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
+ * an {@link java.util.Iterator Iterator} that reflects the latest set of
+ * available {@link Discoverable} services.
+ */
+ Iterable<Discoverable> discover(String name);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
new file mode 100644
index 0000000..2612138
--- /dev/null
+++ b/twill-discovery-core/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-discovery-core</artifactId>
+ <name>Twill discovery service implementations</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-discovery-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
new file mode 100644
index 0000000..5fa97d1
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Wrapper for a discoverable.
+ */
+final class DiscoverableWrapper implements Discoverable {
+ private final String name;
+ private final InetSocketAddress address;
+
+ DiscoverableWrapper(Discoverable discoverable) {
+ this.name = discoverable.getName();
+ this.address = discoverable.getSocketAddress();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return address;
+ }
+
+ @Override
+ public String toString() {
+ return "{name=" + name + ", address=" + address;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Discoverable other = (Discoverable) o;
+
+ return name.equals(other.getName()) && address.equals(other.getSocketAddress());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
new file mode 100644
index 0000000..7a9e984
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A simple in memory implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ */
+public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+
+ private final Multimap<String, Discoverable> services = HashMultimap.create();
+ private final Lock lock = new ReentrantLock();
+
+ @Override
+ public Cancellable register(final Discoverable discoverable) {
+ lock.lock();
+ try {
+ final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+ services.put(wrapper.getName(), wrapper);
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ lock.lock();
+ try {
+ services.remove(wrapper.getName(), wrapper);
+ } finally {
+ lock.unlock();
+ }
+ }
+ };
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Iterable<Discoverable> discover(final String name) {
+ return new Iterable<Discoverable>() {
+ @Override
+ public Iterator<Discoverable> iterator() {
+ lock.lock();
+ try {
+ return ImmutableList.copyOf(services.get(name)).iterator();
+ } finally {
+ lock.unlock();
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
new file mode 100644
index 0000000..e2f9bc0
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ * <p>
+ * Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
+ * If you would like to change the namespace under which the services are registered then you can pass
+ * in the namespace during construction of {@link ZKDiscoveryService}.
+ * </p>
+ *
+ * <p>
+ * Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
+ * and also for discovering the registered services.
+ * <blockquote>
+ * <pre>
+ * {@code
+ *
+ * DiscoveryService service = new ZKDiscoveryService(zkClient);
+ * service.register(new Discoverable() {
+ * @Override
+ * public String getName() {
+ * return 'service-name';
+ * }
+ *
+ * @Override
+ * public InetSocketAddress getSocketAddress() {
+ * return new InetSocketAddress(hostname, port);
+ * }
+ * });
+ * ...
+ * ...
+ * Iterable<Discoverable> services = service.discovery("service-name");
+ * ...
+ * }
+ * </pre>
+ * </blockquote>
+ * </p>
+ */
+public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
+ private static final String NAMESPACE = "/discoverable";
+
+ private static final long RETRY_MILLIS = 1000;
+
+ // In memory map for recreating ephemeral nodes after session expires.
+ // It map from discoverable to the corresponding Cancellable
+ private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
+ private final Lock lock;
+
+ private final LoadingCache<String, Iterable<Discoverable>> services;
+ private final ZKClient zkClient;
+ private final ScheduledExecutorService retryExecutor;
+
+ /**
+ * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
+ * @param zkClient The {@link ZKClient} for interacting with zookeeper.
+ */
+ public ZKDiscoveryService(ZKClient zkClient) {
+ this(zkClient, NAMESPACE);
+ }
+
+ /**
+ * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namepsace.
+ * @param zkClient of zookeeper quorum
+ * @param namespace under which the service registered would be stored in zookeeper.
+ * If namespace is {@code null}, no namespace will be used.
+ */
+ public ZKDiscoveryService(ZKClient zkClient, String namespace) {
+ this.discoverables = HashMultimap.create();
+ this.lock = new ReentrantLock();
+ this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("zk-discovery-retry"));
+ this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
+ this.services = CacheBuilder.newBuilder().build(createServiceLoader());
+ this.zkClient.addConnectionWatcher(createConnectionWatcher());
+ }
+
+ /**
+ * Registers a {@link Discoverable} in zookeeper.
+ * <p>
+ * Registering a {@link Discoverable} will create a node <base>/<service-name>
+ * in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral, then a runtime
+ * exception is thrown to make sure that a service with an intent to register is not started without registering.
+ * When a runtime is thrown, expectation is that the process being started with fail and would be started again
+ * by the monitoring service.
+ * </p>
+ * @param discoverable Information of the service provider that could be discovered.
+ * @return An instance of {@link Cancellable}
+ */
+ @Override
+ public Cancellable register(final Discoverable discoverable) {
+ final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+ final SettableFuture<String> future = SettableFuture.create();
+ final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
+
+ // Create the zk ephemeral node.
+ Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // Set the sequence node path to cancellable for future cancellation.
+ cancellable.setPath(result);
+ lock.lock();
+ try {
+ discoverables.put(wrapper, cancellable);
+ } finally {
+ lock.unlock();
+ }
+ LOG.debug("Service registered: {} {}", wrapper, result);
+ future.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof KeeperException.NodeExistsException) {
+ handleRegisterFailure(discoverable, future, this, t);
+ } else {
+ LOG.warn("Failed to register: {}", wrapper, t);
+ future.setException(t);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Futures.getUnchecked(future);
+ return cancellable;
+ }
+
+ @Override
+ public Iterable<Discoverable> discover(String service) {
+ return services.getUnchecked(service);
+ }
+
+ /**
+ * Handle registration failure.
+ *
+ * @param discoverable The discoverable to register.
+ * @param completion A settable future to set when registration is completed / failed.
+ * @param creationCallback A future callback for path creation.
+ * @param failureCause The original cause of failure.
+ */
+ private void handleRegisterFailure(final Discoverable discoverable,
+ final SettableFuture<String> completion,
+ final FutureCallback<String> creationCallback,
+ final Throwable failureCause) {
+
+ final String path = getNodePath(discoverable);
+ Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (result == null) {
+ // If the node is gone, simply retry.
+ LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
+ retryRegister(discoverable, creationCallback);
+ return;
+ }
+
+ long ephemeralOwner = result.getEphemeralOwner();
+ if (ephemeralOwner == 0) {
+ // it is not an ephemeral node, something wrong.
+ LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
+ path, discoverable);
+ completion.setException(failureCause);
+ return;
+ }
+ Long sessionId = zkClient.getSessionId();
+ if (sessionId == null || ephemeralOwner != sessionId) {
+ // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
+ LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
+ retryRegister(discoverable, creationCallback);
+ } else {
+ // This client owned the node, treat the registration as completed.
+ // This could happen if same client tries to register twice (due to mistake or failure race condition).
+ completion.set(path);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // If exists call failed, simply retry creation.
+ LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
+ retryRegister(discoverable, creationCallback);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ private OperationFuture<String> doRegister(Discoverable discoverable) {
+ byte[] discoverableBytes = encode(discoverable);
+ return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
+ }
+
+ private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
+ retryExecutor.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+
+ /**
+ * Generate unique node path for a given {@link Discoverable}.
+ * @param discoverable An instance of {@link Discoverable}.
+ * @return A node name based on the discoverable.
+ */
+ private String getNodePath(Discoverable discoverable) {
+ InetSocketAddress socketAddress = discoverable.getSocketAddress();
+ String node = Hashing.md5()
+ .newHasher()
+ .putBytes(socketAddress.getAddress().getAddress())
+ .putInt(socketAddress.getPort())
+ .hash().toString();
+
+ return String.format("/%s/%s", discoverable.getName(), node);
+ }
+
+ private Watcher createConnectionWatcher() {
+ return new Watcher() {
+ // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
+ private boolean expired;
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.Expired) {
+ LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
+ expired = true;
+ } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
+ LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
+ expired = false;
+
+ // Re-register all services
+ lock.lock();
+ try {
+ for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+ LOG.info("Re-registering service: {}", entry.getKey());
+
+ // Must be non-blocking in here.
+ Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // Updates the cancellable to the newly created sequential node.
+ entry.getValue().setPath(result);
+ LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
+ entry.getValue().setPath(null);
+ LOG.error("Failed to re-register service: {}", entry.getKey(), t);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
+ */
+ private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
+ return new CacheLoader<String, Iterable<Discoverable>>() {
+ @Override
+ public Iterable<Discoverable> load(String service) throws Exception {
+ // The atomic reference is to keep the resulting Iterable live. It always contains a
+ // immutable snapshot of the latest detected set of Discoverable.
+ final AtomicReference<Iterable<Discoverable>> iterable =
+ new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
+ final String serviceBase = "/" + service;
+
+ // Watch for children changes in /service
+ ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
+ @Override
+ public void updated(NodeChildren nodeChildren) {
+ // Fetch data of all children nodes in parallel.
+ List<String> children = nodeChildren.getChildren();
+ List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
+ for (String child : children) {
+ dataFutures.add(zkClient.getData(serviceBase + "/" + child));
+ }
+
+ // Update the service map when all fetching are done.
+ final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
+ fetchFuture.addListener(new Runnable() {
+ @Override
+ public void run() {
+ ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
+ for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
+ // For successful fetch, decode the content.
+ if (nodeData != null) {
+ Discoverable discoverable = decode(nodeData.getData());
+ if (discoverable != null) {
+ builder.add(discoverable);
+ }
+ }
+ }
+ iterable.set(builder.build());
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ });
+
+ return new Iterable<Discoverable>() {
+ @Override
+ public Iterator<Discoverable> iterator() {
+ return iterable.get().iterator();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
+ * @param bytes representing serialized {@link DiscoverableWrapper}
+ * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
+ */
+ private static Discoverable decode(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ String content = new String(bytes, Charsets.UTF_8);
+ return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
+ .create()
+ .fromJson(content, Discoverable.class);
+ }
+
+ /**
+ * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
+ * @param discoverable An instance of {@link Discoverable}
+ * @return array of bytes representing an instance of <code>discoverable</code>
+ */
+ private static byte[] encode(Discoverable discoverable) {
+ return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
+ .create()
+ .toJson(discoverable, DiscoverableWrapper.class)
+ .getBytes(Charsets.UTF_8);
+ }
+
+ /**
+ * Inner class for cancelling (un-register) discovery service.
+ */
+ private final class DiscoveryCancellable implements Cancellable {
+
+ private final Discoverable discoverable;
+ private final AtomicBoolean cancelled;
+ private volatile String path;
+
+ DiscoveryCancellable(Discoverable discoverable) {
+ this.discoverable = discoverable;
+ this.cancelled = new AtomicBoolean();
+ }
+
+ /**
+ * Set the zk node path representing the ephemeral sequence node of this registered discoverable.
+ * Called from ZK event thread when creating of the node completed, either from normal registration or
+ * re-registration due to session expiration.
+ *
+ * @param path The path to ephemeral sequence node.
+ */
+ void setPath(String path) {
+ this.path = path;
+ if (cancelled.get() && path != null) {
+ // Simply delete the path if it's already cancelled
+ // It's for the case when session expire happened and re-registration completed after this has been cancelled.
+ // Not bother with the result as if there is error, nothing much we could do.
+ zkClient.delete(path);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (!cancelled.compareAndSet(false, true)) {
+ return;
+ }
+
+ // Take a snapshot of the volatile path.
+ String path = this.path;
+
+ // If it is null, meaning cancel() is called before the ephemeral node is created, hence
+ // setPath() will be called in future (through zk callback when creation is completed)
+ // so that deletion will be done in setPath().
+ if (path == null) {
+ return;
+ }
+
+ // Remove this Cancellable from the map so that upon session expiration won't try to register.
+ lock.lock();
+ try {
+ discoverables.remove(discoverable, this);
+ } finally {
+ lock.unlock();
+ }
+
+ // Delete the path. It's ok if the path not exists
+ // (e.g. what session expired and before node has been re-created)
+ Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
+ KeeperException.NoNodeException.class, path));
+ LOG.debug("Service unregistered: {} {}", discoverable, path);
+ }
+ }
+
+ /**
+ * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
+ * or from a JSON object into {@link DiscoverableWrapper}.
+ */
+ private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
+
+ @Override
+ public Discoverable deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ final String service = jsonObj.get("service").getAsString();
+ String hostname = jsonObj.get("hostname").getAsString();
+ int port = jsonObj.get("port").getAsInt();
+ final InetSocketAddress address = new InetSocketAddress(hostname, port);
+ return new Discoverable() {
+ @Override
+ public String getName() {
+ return service;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return address;
+ }
+ };
+ }
+
+ @Override
+ public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("service", src.getName());
+ jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
+ jsonObj.addProperty("port", src.getSocketAddress().getPort());
+ return jsonObj;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
new file mode 100644
index 0000000..a1d6e0c
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes in this package provides service discovery implementations.
+ */
+package org.apache.twill.discovery;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
new file mode 100644
index 0000000..d8cc375
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test memory based service discovery service.
+ */
+public class InMemoryDiscoveryServiceTest {
+ private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+ return service.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(host, port);
+ }
+ });
+ }
+
+ @Test
+ public void simpleDiscoverable() throws Exception {
+ DiscoveryService discoveryService = new InMemoryDiscoveryService();
+ DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
+
+ // Register one service running on one host:port
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+ // Discover that registered host:port.
+ Assert.assertTrue(Iterables.size(discoverables) == 1);
+
+ // Remove the service
+ cancellable.cancel();
+
+ // There should be no service.
+ discoverables = discoveryServiceClient.discover("foo");
+ TimeUnit.MILLISECONDS.sleep(100);
+ Assert.assertTrue(Iterables.size(discoverables) == 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
new file mode 100644
index 0000000..feee8db
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.zookeeper.KillZKSession;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Zookeeper based discovery service.
+ */
+public class ZKDiscoveryServiceTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
+
+ private static InMemoryZKServer zkServer;
+ private static ZKClientService zkClient;
+
+ @BeforeClass
+ public static void beforeClass() {
+ zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
+ zkServer.startAndWait();
+
+ zkClient = ZKClientServices.delegate(
+ ZKClients.retryOnFailure(
+ ZKClients.reWatchOnExpire(
+ ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+ zkClient.startAndWait();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
+ }
+
+ private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+ return service.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(host, port);
+ }
+ });
+ }
+
+
+ private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
+ for (int i = 0; i < 10; ++i) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ if (Iterables.size(discoverables) == expected) {
+ return true;
+ }
+ }
+ return (Iterables.size(discoverables) == expected);
+ }
+
+ @Test (timeout = 5000)
+ public void testDoubleRegister() throws Exception {
+ ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+ // Register on the same host port, it shouldn't fail.
+ Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
+ Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
+
+ Assert.assertTrue(waitTillExpected(1, discoverables));
+
+ cancellable.cancel();
+ cancellable2.cancel();
+
+ // Register again with two different clients, but killing session of the first one.
+ final ZKClientService zkClient2 = ZKClientServices.delegate(
+ ZKClients.retryOnFailure(
+ ZKClients.reWatchOnExpire(
+ ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+ zkClient2.startAndWait();
+
+ try {
+ ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
+ cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
+
+ // Schedule a thread to shutdown zkClient2.
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ zkClient2.stopAndWait();
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }.start();
+
+ // This call would block until zkClient2 is shutdown.
+ cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
+ cancellable.cancel();
+
+ } finally {
+ zkClient2.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testSessionExpires() throws Exception {
+ ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+ Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
+
+ // Discover that registered host:port.
+ Assert.assertTrue(waitTillExpected(1, discoverables));
+
+ KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 5000);
+
+ // Register one more endpoint to make sure state has been reflected after reconnection
+ Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
+
+ // Reconnection would trigger re-registration.
+ Assert.assertTrue(waitTillExpected(2, discoverables));
+
+ cancellable.cancel();
+ cancellable2.cancel();
+
+ // Verify that both are now gone.
+ Assert.assertTrue(waitTillExpected(0, discoverables));
+ }
+
+ @Test
+ public void simpleDiscoverable() throws Exception {
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+ // Register one service running on one host:port
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+ // Discover that registered host:port.
+ Assert.assertTrue(waitTillExpected(1, discoverables));
+
+ // Remove the service
+ cancellable.cancel();
+
+ // There should be no service.
+
+ discoverables = discoveryServiceClient.discover("foo");
+
+ Assert.assertTrue(waitTillExpected(0, discoverables));
+ }
+
+ @Test
+ public void manySameDiscoverable() throws Exception {
+ List<Cancellable> cancellables = Lists.newArrayList();
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
+ Assert.assertTrue(waitTillExpected(5, discoverables));
+
+ for (int i = 0; i < 5; i++) {
+ cancellables.get(i).cancel();
+ Assert.assertTrue(waitTillExpected(4 - i, discoverables));
+ }
+ }
+
+ @Test
+ public void multiServiceDiscoverable() throws Exception {
+ List<Cancellable> cancellables = Lists.newArrayList();
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+ cancellables.add(register(discoveryService, "service1", "localhost", 1));
+ cancellables.add(register(discoveryService, "service1", "localhost", 2));
+ cancellables.add(register(discoveryService, "service1", "localhost", 3));
+ cancellables.add(register(discoveryService, "service1", "localhost", 4));
+ cancellables.add(register(discoveryService, "service1", "localhost", 5));
+
+ cancellables.add(register(discoveryService, "service2", "localhost", 1));
+ cancellables.add(register(discoveryService, "service2", "localhost", 2));
+ cancellables.add(register(discoveryService, "service2", "localhost", 3));
+
+ cancellables.add(register(discoveryService, "service3", "localhost", 1));
+ cancellables.add(register(discoveryService, "service3", "localhost", 2));
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
+ Assert.assertTrue(waitTillExpected(5, discoverables));
+
+ discoverables = discoveryServiceClient.discover("service2");
+ Assert.assertTrue(waitTillExpected(3, discoverables));
+
+ discoverables = discoveryServiceClient.discover("service3");
+ Assert.assertTrue(waitTillExpected(2, discoverables));
+
+ cancellables.add(register(discoveryService, "service3", "localhost", 3));
+ Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
+
+ for (Cancellable cancellable : cancellables) {
+ cancellable.cancel();
+ }
+
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/resources/logback-test.xml b/twill-discovery-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2615cb4
--- /dev/null
+++ b/twill-discovery-core/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.twill" level="DEBUG" />
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
new file mode 100644
index 0000000..b11bc7a
--- /dev/null
+++ b/twill-yarn/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-yarn</artifactId>
+ <name>Twill Apache Hadoop YARN library</name>
+
+ <properties>
+ <output.dir>target/classes</output.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-discovery-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>${output.dir}</outputDirectory>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop-2.0</id>
+ <properties>
+ <output.dir>${hadoop20.output.dir}</output.dir>
+ </properties>
+ </profile>
+ <profile>
+ <id>hadoop-2.1</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop-2.2</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
new file mode 100644
index 0000000..d98dee1
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.apache.twill.internal.yarn.ports.AMRMClient;
+import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
+import org.apache.twill.internal.yarn.ports.AllocationResponse;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
+ private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+ static {
+ STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+ @Override
+ public YarnContainerStatus apply(ContainerStatus status) {
+ return new Hadoop20YarnContainerStatus(status);
+ }
+ };
+ }
+
+ private final ContainerId containerId;
+ private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+ private final AMRMClient amrmClient;
+ private final YarnNMClient nmClient;
+ private InetSocketAddress trackerAddr;
+ private URL trackerUrl;
+ private Resource maxCapability;
+ private Resource minCapability;
+
+ public Hadoop20YarnAMClient(Configuration conf) {
+ String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ Preconditions.checkArgument(masterContainerId != null,
+ "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
+ this.containerId = ConverterUtils.toContainerId(masterContainerId);
+ this.containerRequests = ArrayListMultimap.create();
+
+ this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
+ this.amrmClient.init(conf);
+ this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+ Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+ amrmClient.start();
+
+ RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+ trackerAddr.getPort(),
+ trackerUrl.toString());
+ maxCapability = response.getMaximumResourceCapability();
+ minCapability = response.getMinimumResourceCapability();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+ amrmClient.stop();
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ @Override
+ public String getHost() {
+ return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
+ }
+
+ @Override
+ public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+ this.trackerAddr = trackerAddr;
+ this.trackerUrl = trackerUrl;
+ }
+
+ @Override
+ public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+ AllocationResponse response = amrmClient.allocate(progress);
+ List<ProcessLauncher<YarnContainerInfo>> launchers
+ = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+ for (Container container : response.getAllocatedContainers()) {
+ launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+ }
+
+ if (!launchers.isEmpty()) {
+ handler.acquired(launchers);
+
+ // If no process has been launched through the given launcher, return the container.
+ for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+ // This cast always works.
+ RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+ if (!launcher.isLaunched()) {
+ Container container = launcher.getContainerInfo().getContainer();
+ LOG.info("Nothing to run in container, releasing it: {}", container);
+ amrmClient.releaseAssignedContainer(container.getId());
+ }
+ }
+ }
+
+ List<YarnContainerStatus> completed = ImmutableList.copyOf(
+ Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+ if (!completed.isEmpty()) {
+ handler.completed(completed);
+ }
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability) {
+ return addContainerRequest(capability, 1);
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+ return new ContainerRequestBuilder(adjustCapability(capability), count) {
+ @Override
+ public String apply() {
+ synchronized (Hadoop20YarnAMClient.this) {
+ String id = UUID.randomUUID().toString();
+
+ String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+ String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+ for (int i = 0; i < count; i++) {
+ AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
+ priority, 1);
+ containerRequests.put(id, request);
+ amrmClient.addContainerRequest(request);
+ }
+
+ return id;
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized void completeContainerRequest(String id) {
+ for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+ amrmClient.removeContainerRequest(request);
+ }
+ }
+
+ private Resource adjustCapability(Resource resource) {
+ int cores = YarnUtils.getVirtualCores(resource);
+ int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+ YarnUtils.getVirtualCores(minCapability));
+ // Try and set the virtual cores, which older versions of YARN don't support this.
+ if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+ LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+ }
+
+ int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+ int minMemory = minCapability.getMemory();
+ updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+ if (resource.getMemory() != updatedMemory) {
+ resource.setMemory(updatedMemory);
+ LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+ }
+
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
new file mode 100644
index 0000000..bfec34e
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
+ private final YarnClient yarnClient;
+ private String user;
+
+ public Hadoop20YarnAppClient(Configuration configuration) {
+ this.yarnClient = new YarnClientImpl();
+ yarnClient.init(configuration);
+ this.user = System.getProperty("user.name");
+ }
+
+ @Override
+ public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+ // Request for new application
+ final GetNewApplicationResponse response = yarnClient.getNewApplication();
+ final ApplicationId appId = response.getApplicationId();
+
+ // Setup the context for application submission
+ final ApplicationSubmissionContext appSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
+ appSubmissionContext.setApplicationId(appId);
+ appSubmissionContext.setApplicationName(twillSpec.getName());
+ appSubmissionContext.setUser(user);
+
+ ApplicationSubmitter submitter = new ApplicationSubmitter() {
+
+ @Override
+ public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
+ ContainerLaunchContext context = launchContext.getLaunchContext();
+ addRMToken(context);
+ context.setUser(appSubmissionContext.getUser());
+ context.setResource(adjustMemory(response, capability));
+ appSubmissionContext.setAMContainerSpec(context);
+
+ try {
+ yarnClient.submitApplication(appSubmissionContext);
+ return new ProcessControllerImpl(yarnClient, appId);
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to submit application {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+ };
+
+ return new ApplicationMasterProcessLauncher(appId, submitter);
+ }
+
+ private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+ int minMemory = response.getMinimumResourceCapability().getMemory();
+
+ int updatedMemory = Math.min(capability.getMemory(), response.getMaximumResourceCapability().getMemory());
+ updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+ if (updatedMemory != capability.getMemory()) {
+ capability.setMemory(updatedMemory);
+ }
+
+ return capability;
+ }
+
+ private void addRMToken(ContainerLaunchContext context) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ try {
+ Credentials credentials = YarnUtils.decodeCredentials(context.getContainerTokens());
+
+ Configuration config = yarnClient.getConfig();
+ Token<TokenIdentifier> token = convertToken(
+ yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+ YarnUtils.getRMAddress(config));
+
+ LOG.info("Added RM delegation token {}", token);
+ credentials.addToken(token.getService(), token);
+
+ context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+
+ } catch (Exception e) {
+ LOG.error("Fails to create credentials.", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken protoToken, InetSocketAddress serviceAddr) {
+ Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+ protoToken.getPassword().array(),
+ new Text(protoToken.getKind()),
+ new Text(protoToken.getService()));
+ if (serviceAddr != null) {
+ SecurityUtil.setTokenService(token, serviceAddr);
+ }
+ return token;
+ }
+
+ @Override
+ public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+ this.user = user;
+ return createLauncher(twillSpec);
+ }
+
+ @Override
+ public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+ return new ProcessControllerImpl(yarnClient, appId);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ yarnClient.start();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ yarnClient.stop();
+ }
+
+ private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+ private final YarnClient yarnClient;
+ private final ApplicationId appId;
+
+ public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+ this.yarnClient = yarnClient;
+ this.appId = appId;
+ }
+
+ @Override
+ public YarnApplicationReport getReport() {
+ try {
+ return new Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to get application report {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ try {
+ yarnClient.killApplication(appId);
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to kill application {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
new file mode 100644
index 0000000..6c1b764
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop20YarnApplicationReport implements YarnApplicationReport {
+
+ private final ApplicationReport report;
+
+ public Hadoop20YarnApplicationReport(ApplicationReport report) {
+ this.report = report;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return report.getApplicationId();
+ }
+
+ @Override
+ public ApplicationAttemptId getCurrentApplicationAttemptId() {
+ return report.getCurrentApplicationAttemptId();
+ }
+
+ @Override
+ public String getQueue() {
+ return report.getQueue();
+ }
+
+ @Override
+ public String getName() {
+ return report.getName();
+ }
+
+ @Override
+ public String getHost() {
+ return report.getHost();
+ }
+
+ @Override
+ public int getRpcPort() {
+ return report.getRpcPort();
+ }
+
+ @Override
+ public YarnApplicationState getYarnApplicationState() {
+ return report.getYarnApplicationState();
+ }
+
+ @Override
+ public String getDiagnostics() {
+ return report.getDiagnostics();
+ }
+
+ @Override
+ public String getTrackingUrl() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public String getOriginalTrackingUrl() {
+ return report.getOriginalTrackingUrl();
+ }
+
+ @Override
+ public long getStartTime() {
+ return report.getStartTime();
+ }
+
+ @Override
+ public long getFinishTime() {
+ return report.getFinishTime();
+ }
+
+ @Override
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return report.getFinalApplicationStatus();
+ }
+
+ @Override
+ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+ return report.getApplicationResourceUsageReport();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
new file mode 100644
index 0000000..79b2cb5
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
+
+ private final Container container;
+
+ public Hadoop20YarnContainerInfo(Container container) {
+ this.container = container;
+ }
+
+ @Override
+ public <T> T getContainer() {
+ return (T) container;
+ }
+
+ @Override
+ public String getId() {
+ return container.getId().toString();
+ }
+
+ @Override
+ public InetAddress getHost() {
+ try {
+ return InetAddress.getByName(container.getNodeId().getHost());
+ } catch (UnknownHostException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return container.getNodeId().getPort();
+ }
+
+ @Override
+ public int getMemoryMB() {
+ return container.getResource().getMemory();
+ }
+
+ @Override
+ public int getVirtualCores() {
+ return YarnUtils.getVirtualCores(container.getResource());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
new file mode 100644
index 0000000..cc61856
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
+
+ private final ContainerStatus containerStatus;
+
+ public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
+ this.containerStatus = containerStatus;
+ }
+
+ @Override
+ public String getContainerId() {
+ return containerStatus.getContainerId().toString();
+ }
+
+ @Override
+ public ContainerState getState() {
+ return containerStatus.getState();
+ }
+
+ @Override
+ public int getExitStatus() {
+ return containerStatus.getExitStatus();
+ }
+
+ @Override
+ public String getDiagnostics() {
+ return containerStatus.getDiagnostics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
new file mode 100644
index 0000000..b1f6d66
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
+
+ private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+ static {
+ // Creates transform function from YarnLocalResource -> LocalResource
+ RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+ @Override
+ public LocalResource apply(YarnLocalResource input) {
+ return input.getLocalResource();
+ }
+ };
+ }
+
+ private final ContainerLaunchContext launchContext;
+
+ public Hadoop20YarnLaunchContext() {
+ launchContext = Records.newRecord(ContainerLaunchContext.class);
+ }
+
+ @Override
+ public <T> T getLaunchContext() {
+ return (T) launchContext;
+ }
+
+ @Override
+ public void setCredentials(Credentials credentials) {
+ launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+ }
+
+ @Override
+ public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+ launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+ }
+
+ @Override
+ public void setServiceData(Map<String, ByteBuffer> serviceData) {
+ launchContext.setServiceData(serviceData);
+ }
+
+ @Override
+ public Map<String, String> getEnvironment() {
+ return launchContext.getEnvironment();
+ }
+
+ @Override
+ public void setEnvironment(Map<String, String> environment) {
+ launchContext.setEnvironment(environment);
+ }
+
+ @Override
+ public List<String> getCommands() {
+ return launchContext.getCommands();
+ }
+
+ @Override
+ public void setCommands(List<String> commands) {
+ launchContext.setCommands(commands);
+ }
+
+ @Override
+ public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+ launchContext.setApplicationACLs(acls);
+ }
+}