You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:50 UTC
[08/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
new file mode 100644
index 0000000..6dcd1a7
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
+ */
+public final class ZKOperations {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZKOperations.class);
+
+ /**
+ * Represents a ZK operation updates callback.
+ * @param <T> Type of updated data.
+ */
+ public interface Callback<T> {
+ void updated(T data);
+ }
+
+ /**
+ * Interface for defining callback method to receive node data updates.
+ */
+ public interface DataCallback extends Callback<NodeData> {
+ /**
+ * Invoked when data of the node changed.
+ * @param nodeData New data of the node, or {@code null} if the node has been deleted.
+ */
+ @Override
+ void updated(NodeData nodeData);
+ }
+
+ /**
+ * Interface for defining callback method to receive children nodes updates.
+ */
+ public interface ChildrenCallback extends Callback<NodeChildren> {
+ @Override
+ void updated(NodeChildren nodeChildren);
+ }
+
+ private interface Operation<T> {
+ ZKClient getZKClient();
+
+ OperationFuture<T> exec(String path, Watcher watcher);
+ }
+
+ /**
+ * Watch for data changes of the given path. The callback will be triggered whenever changes has been
+ * detected. Note that the callback won't see every single changes, as that's not the guarantee of ZooKeeper.
+ * If the node doesn't exists, it will watch for its creation then starts watching for data changes.
+ * When the node is deleted afterwards,
+ *
+ * @param zkClient The {@link ZKClient} for the operation
+ * @param path Path to watch
+ * @param callback Callback to be invoked when data changes is detected.
+ * @return A {@link Cancellable} to cancel the watch.
+ */
+ public static Cancellable watchData(final ZKClient zkClient, final String path, final DataCallback callback) {
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+ watchChanges(new Operation<NodeData>() {
+
+ @Override
+ public ZKClient getZKClient() {
+ return zkClient;
+ }
+
+ @Override
+ public OperationFuture<NodeData> exec(String path, Watcher watcher) {
+ return zkClient.getData(path, watcher);
+ }
+ }, path, callback, cancelled);
+
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ }
+ };
+ }
+
+ public static ListenableFuture<String> watchDeleted(final ZKClient zkClient, final String path) {
+ SettableFuture<String> completion = SettableFuture.create();
+ watchDeleted(zkClient, path, completion);
+ return completion;
+ }
+
+ public static void watchDeleted(final ZKClient zkClient, final String path,
+ final SettableFuture<String> completion) {
+
+ Futures.addCallback(zkClient.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (!completion.isDone()) {
+ if (event.getType() == Event.EventType.NodeDeleted) {
+ completion.set(path);
+ } else {
+ watchDeleted(zkClient, path, completion);
+ }
+ }
+ }
+ }), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (result == null) {
+ completion.set(path);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ completion.setException(t);
+ }
+ });
+ }
+
+ public static Cancellable watchChildren(final ZKClient zkClient, String path, ChildrenCallback callback) {
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+ watchChanges(new Operation<NodeChildren>() {
+
+ @Override
+ public ZKClient getZKClient() {
+ return zkClient;
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> exec(String path, Watcher watcher) {
+ return zkClient.getChildren(path, watcher);
+ }
+ }, path, callback, cancelled);
+
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ }
+ };
+ }
+
+ /**
+ * Returns a new {@link OperationFuture} that the result will be the same as the given future, except that when
+ * the source future is having an exception matching the giving exception type, the errorResult will be set
+ * in to the returned {@link OperationFuture}.
+ * @param future The source future.
+ * @param exceptionType Type of {@link KeeperException} to be ignored.
+ * @param errorResult Object to be set into the resulting future on a matching exception.
+ * @param <V> Type of the result.
+ * @return A new {@link OperationFuture}.
+ */
+ public static <V> OperationFuture<V> ignoreError(OperationFuture<V> future,
+ final Class<? extends KeeperException> exceptionType,
+ final V errorResult) {
+ final SettableOperationFuture<V> resultFuture = SettableOperationFuture.create(future.getRequestPath(),
+ Threads.SAME_THREAD_EXECUTOR);
+
+ Futures.addCallback(future, new FutureCallback<V>() {
+ @Override
+ public void onSuccess(V result) {
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (exceptionType.isAssignableFrom(t.getClass())) {
+ resultFuture.set(errorResult);
+ } else if (t instanceof CancellationException) {
+ resultFuture.cancel(true);
+ } else {
+ resultFuture.setException(t);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ return resultFuture;
+ }
+
+ /**
+ * Deletes the given path recursively. The delete method will keep running until the given path is successfully
+ * removed, which means if there are new node created under the given path while deleting, they'll get deleted
+ * again. If there is {@link KeeperException} during the deletion other than
+ * {@link KeeperException.NotEmptyException} or {@link KeeperException.NoNodeException},
+ * the exception would be reflected in the result future and deletion process will stop,
+ * leaving the given path with intermediate state.
+ *
+ * @param path The path to delete.
+ * @return An {@link OperationFuture} that will be completed when the given path is deleted or bailed due to
+ * exception.
+ */
+ public static OperationFuture<String> recursiveDelete(final ZKClient zkClient, final String path) {
+ final SettableOperationFuture<String> resultFuture =
+ SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+
+ // Try to delete the given path.
+ Futures.addCallback(zkClient.delete(path), new FutureCallback<String>() {
+ private final FutureCallback<String> deleteCallback = this;
+
+ @Override
+ public void onSuccess(String result) {
+ // Path deleted successfully. Operation done.
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Failed to delete the given path
+ if (!(t instanceof KeeperException.NotEmptyException || t instanceof KeeperException.NoNodeException)) {
+ // For errors other than NotEmptyException, treat the operation as failed.
+ resultFuture.setException(t);
+ return;
+ }
+
+ // If failed because of NotEmptyException, get the list of children under the given path
+ Futures.addCallback(zkClient.getChildren(path), new FutureCallback<NodeChildren>() {
+
+ @Override
+ public void onSuccess(NodeChildren result) {
+ // Delete all children nodes recursively.
+ final List<OperationFuture<String>> deleteFutures = Lists.newLinkedList();
+ for (String child :result.getChildren()) {
+ deleteFutures.add(recursiveDelete(zkClient, path + "/" + child));
+ }
+
+ // When deletion of all children succeeded, delete the given path again.
+ Futures.successfulAsList(deleteFutures).addListener(new Runnable() {
+ @Override
+ public void run() {
+ for (OperationFuture<String> deleteFuture : deleteFutures) {
+ try {
+ // If any exception when deleting children, treat the operation as failed.
+ deleteFuture.get();
+ } catch (Exception e) {
+ resultFuture.setException(e.getCause());
+ }
+ }
+ Futures.addCallback(zkClient.delete(path), deleteCallback, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // If failed to get list of children, treat the operation as failed.
+ resultFuture.setException(t);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ return resultFuture;
+ }
+
+ /**
+ * Watch for the given path until it exists.
+ * @param zkClient The {@link ZKClient} to use.
+ * @param path A ZooKeeper path to watch for existent.
+ */
+ private static void watchExists(final ZKClient zkClient, final String path, final SettableFuture<String> completion) {
+ Futures.addCallback(zkClient.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (!completion.isDone()) {
+ watchExists(zkClient, path, completion);
+ }
+ }
+ }), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (result != null) {
+ completion.set(path);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ completion.setException(t);
+ }
+ });
+ }
+
+ private static <T> void watchChanges(final Operation<T> operation, final String path,
+ final Callback<T> callback, final AtomicBoolean cancelled) {
+ Futures.addCallback(operation.exec(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (!cancelled.get()) {
+ watchChanges(operation, path, callback, cancelled);
+ }
+ }
+ }), new FutureCallback<T>() {
+ @Override
+ public void onSuccess(T result) {
+ if (!cancelled.get()) {
+ callback.updated(result);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
+ final SettableFuture<String> existCompletion = SettableFuture.create();
+ existCompletion.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!cancelled.get()) {
+ watchChanges(operation, existCompletion.get(), callback, cancelled);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to watch children for path " + path, e);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ watchExists(operation.getZKClient(), path, existCompletion);
+ return;
+ }
+ LOG.error("Failed to watch data for path " + path + " " + t, t);
+ }
+ });
+ }
+
+ private ZKOperations() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
new file mode 100644
index 0000000..e5bd237
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides functionality for ZooKeeper interactions.
+ */
+package org.apache.twill.zookeeper;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
new file mode 100644
index 0000000..601f0bd
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class RetryStrategyTest {
+
+ @Test
+ public void testNoRetry() {
+ RetryStrategy strategy = RetryStrategies.noRetry();
+ long startTime = System.currentTimeMillis();
+ for (int i = 1; i <= 10; i++) {
+ Assert.assertEquals(-1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ }
+
+ @Test
+ public void testLimit() {
+ RetryStrategy strategy = RetryStrategies.limit(10, RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS));
+ long startTime = System.currentTimeMillis();
+ for (int i = 1; i <= 10; i++) {
+ Assert.assertEquals(1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ Assert.assertEquals(-1L, strategy.nextRetry(11, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+
+ @Test
+ public void testUnlimited() {
+ RetryStrategy strategy = RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS);
+ long startTime = System.currentTimeMillis();
+ for (int i = 1; i <= 10; i++) {
+ Assert.assertEquals(1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ Assert.assertEquals(1L, strategy.nextRetry(100000, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+
+ @Test
+ public void testExponential() {
+ RetryStrategy strategy = RetryStrategies.exponentialDelay(1, 60000, TimeUnit.MILLISECONDS);
+ long startTime = System.currentTimeMillis();
+ for (int i = 1; i <= 16; i++) {
+ Assert.assertEquals(1L << (i - 1), strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ for (int i = 60; i <= 80; i++) {
+ Assert.assertEquals(60000, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ }
+
+ @Test
+ public void testExponentialLimit() {
+ RetryStrategy strategy = RetryStrategies.limit(99,
+ RetryStrategies.exponentialDelay(1, 60000, TimeUnit.MILLISECONDS));
+ long startTime = System.currentTimeMillis();
+ for (int i = 1; i <= 16; i++) {
+ Assert.assertEquals(1L << (i - 1), strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ for (int i = 60; i <= 80; i++) {
+ Assert.assertEquals(60000, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+ Assert.assertEquals(-1L, strategy.nextRetry(100, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+
+ @Test
+ public void testTimeLimit() throws InterruptedException {
+ RetryStrategy strategy = RetryStrategies.timeLimit(1, TimeUnit.SECONDS,
+ RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS));
+ long startTime = System.currentTimeMillis();
+ Assert.assertEquals(1L, strategy.nextRetry(1, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ TimeUnit.MILLISECONDS.sleep(1100);
+ Assert.assertEquals(-1L, strategy.nextRetry(2, startTime, RetryStrategy.OperationType.CREATE, "/"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
new file mode 100644
index 0000000..f1db74a
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.zookeeper.KillZKSession;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class ZKClientTest {
+
+ @Test
+ public void testChroot() throws Exception {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/chroot").build();
+ client.startAndWait();
+ try {
+ List<OperationFuture<String>> futures = Lists.newArrayList();
+ futures.add(client.create("/test1/test2", null, CreateMode.PERSISTENT));
+ futures.add(client.create("/test1/test3", null, CreateMode.PERSISTENT));
+ Futures.successfulAsList(futures).get();
+
+ Assert.assertNotNull(client.exists("/test1/test2").get());
+ Assert.assertNotNull(client.exists("/test1/test3").get());
+
+ } finally {
+ client.stopAndWait();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testCreateParent() throws ExecutionException, InterruptedException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ client.startAndWait();
+
+ try {
+ String path = client.create("/test1/test2/test3/test4/test5",
+ "testing".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL).get();
+ Assert.assertTrue(path.startsWith("/test1/test2/test3/test4/test5"));
+
+ String dataPath = "";
+ for (int i = 1; i <= 4; i++) {
+ dataPath = dataPath + "/test" + i;
+ Assert.assertNull(client.getData(dataPath).get().getData());
+ }
+ Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData(path).get().getData()));
+ } finally {
+ client.stopAndWait();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testGetChildren() throws ExecutionException, InterruptedException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ client.startAndWait();
+
+ try {
+ client.create("/test", null, CreateMode.PERSISTENT).get();
+ Assert.assertTrue(client.getChildren("/test").get().getChildren().isEmpty());
+
+ Futures.allAsList(ImmutableList.of(client.create("/test/c1", null, CreateMode.EPHEMERAL),
+ client.create("/test/c2", null, CreateMode.EPHEMERAL))).get();
+
+ NodeChildren nodeChildren = client.getChildren("/test").get();
+ Assert.assertEquals(2, nodeChildren.getChildren().size());
+
+ Assert.assertEquals(ImmutableSet.of("c1", "c2"), ImmutableSet.copyOf(nodeChildren.getChildren()));
+
+ } finally {
+ client.stopAndWait();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testSetData() throws ExecutionException, InterruptedException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ client.startAndWait();
+
+ client.create("/test", null, CreateMode.PERSISTENT).get();
+ Assert.assertNull(client.getData("/test").get().getData());
+
+ client.setData("/test", "testing".getBytes()).get();
+ Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData("/test").get().getData()));
+
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testExpireRewatch() throws InterruptedException, IOException, ExecutionException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ final CountDownLatch expireReconnectLatch = new CountDownLatch(1);
+ final AtomicBoolean expired = new AtomicBoolean(false);
+ final ZKClientService client = ZKClientServices.delegate(ZKClients.reWatchOnExpire(
+ ZKClientService.Builder.of(zkServer.getConnectionStr())
+ .setSessionTimeout(2000)
+ .setConnectionWatcher(new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.Expired) {
+ expired.set(true);
+ } else if (event.getState() == Event.KeeperState.SyncConnected && expired.compareAndSet(true, true)) {
+ expireReconnectLatch.countDown();
+ }
+ }
+ }).build()));
+ client.startAndWait();
+
+ try {
+ final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<Watcher.Event.EventType>();
+ client.exists("/expireRewatch", new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ client.exists("/expireRewatch", this);
+ events.add(event.getType());
+ }
+ });
+
+ client.create("/expireRewatch", null, CreateMode.PERSISTENT);
+ Assert.assertEquals(Watcher.Event.EventType.NodeCreated, events.poll(2, TimeUnit.SECONDS));
+
+ KillZKSession.kill(client.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 1000);
+
+ Assert.assertTrue(expireReconnectLatch.await(5, TimeUnit.SECONDS));
+
+ client.delete("/expireRewatch");
+ Assert.assertEquals(Watcher.Event.EventType.NodeDeleted, events.poll(4, TimeUnit.SECONDS));
+ } finally {
+ client.stopAndWait();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testRetry() throws ExecutionException, InterruptedException, TimeoutException {
+ File dataDir = Files.createTempDir();
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build();
+ zkServer.startAndWait();
+ int port = zkServer.getLocalAddress().getPort();
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ ZKClientService client = ZKClientServices.delegate(ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(zkServer.getConnectionStr()).setConnectionWatcher(new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.Disconnected) {
+ disconnectLatch.countDown();
+ }
+ }
+ }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
+ client.startAndWait();
+
+ zkServer.stopAndWait();
+
+ Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+
+ final CountDownLatch createLatch = new CountDownLatch(1);
+ Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ createLatch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ t.printStackTrace(System.out);
+ }
+ });
+
+ TimeUnit.SECONDS.sleep(2);
+ zkServer = InMemoryZKServer.builder()
+ .setDataDir(dataDir)
+ .setAutoCleanDataDir(true)
+ .setPort(port)
+ .setTickTime(1000)
+ .build();
+ zkServer.startAndWait();
+
+ try {
+ Assert.assertTrue(createLatch.await(5, TimeUnit.SECONDS));
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
new file mode 100644
index 0000000..9518d6e
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ZKOperationsTest {
+
+ @Test
+ public void recursiveDelete() throws ExecutionException, InterruptedException, TimeoutException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ client.startAndWait();
+
+ try {
+ client.create("/test1/test10/test101", null, CreateMode.PERSISTENT).get();
+ client.create("/test1/test10/test102", null, CreateMode.PERSISTENT).get();
+ client.create("/test1/test10/test103", null, CreateMode.PERSISTENT).get();
+
+ client.create("/test1/test11/test111", null, CreateMode.PERSISTENT).get();
+ client.create("/test1/test11/test112", null, CreateMode.PERSISTENT).get();
+ client.create("/test1/test11/test113", null, CreateMode.PERSISTENT).get();
+
+ ZKOperations.recursiveDelete(client, "/test1").get(2, TimeUnit.SECONDS);
+
+ Assert.assertNull(client.exists("/test1").get(2, TimeUnit.SECONDS));
+
+ } finally {
+ client.stopAndWait();
+ }
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/resources/logback-test.xml b/twill-zookeeper/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..157df6e
--- /dev/null
+++ b/twill-zookeeper/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
deleted file mode 100644
index b11bc7a..0000000
--- a/yarn/pom.xml
+++ /dev/null
@@ -1,127 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>twill-parent</artifactId>
- <groupId>org.apache.twill</groupId>
- <version>0.1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>twill-yarn</artifactId>
- <name>Twill Apache Hadoop YARN library</name>
-
- <properties>
- <output.dir>target/classes</output.dir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-discovery-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>${output.dir}</outputDirectory>
- </build>
-
- <profiles>
- <profile>
- <id>hadoop-2.0</id>
- <properties>
- <output.dir>${hadoop20.output.dir}</output.dir>
- </properties>
- </profile>
- <profile>
- <id>hadoop-2.1</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- <profile>
- <id>hadoop-2.2</id>
- <build>
- <resources>
- <resource>
- <directory>${hadoop20.output.dir}</directory>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
- </profile>
- </profiles>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
deleted file mode 100644
index d98dee1..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
-import org.apache.twill.internal.yarn.ports.AMRMClient;
-import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
-import org.apache.twill.internal.yarn.ports.AllocationResponse;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
-import java.util.UUID;
-
-/**
- *
- */
-public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
- private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
-
- static {
- STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
- @Override
- public YarnContainerStatus apply(ContainerStatus status) {
- return new Hadoop20YarnContainerStatus(status);
- }
- };
- }
-
- private final ContainerId containerId;
- private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
- private final AMRMClient amrmClient;
- private final YarnNMClient nmClient;
- private InetSocketAddress trackerAddr;
- private URL trackerUrl;
- private Resource maxCapability;
- private Resource minCapability;
-
- public Hadoop20YarnAMClient(Configuration conf) {
- String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
- Preconditions.checkArgument(masterContainerId != null,
- "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
- this.containerId = ConverterUtils.toContainerId(masterContainerId);
- this.containerRequests = ArrayListMultimap.create();
-
- this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
- this.amrmClient.init(conf);
- this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
- }
-
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
- Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
-
- amrmClient.start();
-
- RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
- trackerAddr.getPort(),
- trackerUrl.toString());
- maxCapability = response.getMaximumResourceCapability();
- minCapability = response.getMinimumResourceCapability();
- }
-
- @Override
- protected void shutDown() throws Exception {
- amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
- amrmClient.stop();
- }
-
- @Override
- public ContainerId getContainerId() {
- return containerId;
- }
-
- @Override
- public String getHost() {
- return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
- }
-
- @Override
- public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
- this.trackerAddr = trackerAddr;
- this.trackerUrl = trackerUrl;
- }
-
- @Override
- public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
- AllocationResponse response = amrmClient.allocate(progress);
- List<ProcessLauncher<YarnContainerInfo>> launchers
- = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
-
- for (Container container : response.getAllocatedContainers()) {
- launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
- }
-
- if (!launchers.isEmpty()) {
- handler.acquired(launchers);
-
- // If no process has been launched through the given launcher, return the container.
- for (ProcessLauncher<YarnContainerInfo> l : launchers) {
- // This cast always works.
- RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
- if (!launcher.isLaunched()) {
- Container container = launcher.getContainerInfo().getContainer();
- LOG.info("Nothing to run in container, releasing it: {}", container);
- amrmClient.releaseAssignedContainer(container.getId());
- }
- }
- }
-
- List<YarnContainerStatus> completed = ImmutableList.copyOf(
- Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
- if (!completed.isEmpty()) {
- handler.completed(completed);
- }
- }
-
- @Override
- public ContainerRequestBuilder addContainerRequest(Resource capability) {
- return addContainerRequest(capability, 1);
- }
-
- @Override
- public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
- return new ContainerRequestBuilder(adjustCapability(capability), count) {
- @Override
- public String apply() {
- synchronized (Hadoop20YarnAMClient.this) {
- String id = UUID.randomUUID().toString();
-
- String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
- String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
- for (int i = 0; i < count; i++) {
- AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
- priority, 1);
- containerRequests.put(id, request);
- amrmClient.addContainerRequest(request);
- }
-
- return id;
- }
- }
- };
- }
-
- @Override
- public synchronized void completeContainerRequest(String id) {
- for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
- amrmClient.removeContainerRequest(request);
- }
- }
-
- private Resource adjustCapability(Resource resource) {
- int cores = YarnUtils.getVirtualCores(resource);
- int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
- YarnUtils.getVirtualCores(minCapability));
- // Try and set the virtual cores, which older versions of YARN don't support this.
- if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
- LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
- }
-
- int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
- int minMemory = minCapability.getMemory();
- updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
-
- if (resource.getMemory() != updatedMemory) {
- resource.setMemory(updatedMemory);
- LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
- }
-
- return resource;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
deleted file mode 100644
index bfec34e..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationSubmitter;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- *
- */
-public final class Hadoop20YarnAppClient extends AbstractIdleService implements YarnAppClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
- private final YarnClient yarnClient;
- private String user;
-
- public Hadoop20YarnAppClient(Configuration configuration) {
- this.yarnClient = new YarnClientImpl();
- yarnClient.init(configuration);
- this.user = System.getProperty("user.name");
- }
-
- @Override
- public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
- // Request for new application
- final GetNewApplicationResponse response = yarnClient.getNewApplication();
- final ApplicationId appId = response.getApplicationId();
-
- // Setup the context for application submission
- final ApplicationSubmissionContext appSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
- appSubmissionContext.setApplicationId(appId);
- appSubmissionContext.setApplicationName(twillSpec.getName());
- appSubmissionContext.setUser(user);
-
- ApplicationSubmitter submitter = new ApplicationSubmitter() {
-
- @Override
- public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
- ContainerLaunchContext context = launchContext.getLaunchContext();
- addRMToken(context);
- context.setUser(appSubmissionContext.getUser());
- context.setResource(adjustMemory(response, capability));
- appSubmissionContext.setAMContainerSpec(context);
-
- try {
- yarnClient.submitApplication(appSubmissionContext);
- return new ProcessControllerImpl(yarnClient, appId);
- } catch (YarnRemoteException e) {
- LOG.error("Failed to submit application {}", appId, e);
- throw Throwables.propagate(e);
- }
- }
- };
-
- return new ApplicationMasterProcessLauncher(appId, submitter);
- }
-
- private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
- int minMemory = response.getMinimumResourceCapability().getMemory();
-
- int updatedMemory = Math.min(capability.getMemory(), response.getMaximumResourceCapability().getMemory());
- updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
-
- if (updatedMemory != capability.getMemory()) {
- capability.setMemory(updatedMemory);
- }
-
- return capability;
- }
-
- private void addRMToken(ContainerLaunchContext context) {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
-
- try {
- Credentials credentials = YarnUtils.decodeCredentials(context.getContainerTokens());
-
- Configuration config = yarnClient.getConfig();
- Token<TokenIdentifier> token = convertToken(
- yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
- YarnUtils.getRMAddress(config));
-
- LOG.info("Added RM delegation token {}", token);
- credentials.addToken(token.getService(), token);
-
- context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
-
- } catch (Exception e) {
- LOG.error("Fails to create credentials.", e);
- throw Throwables.propagate(e);
- }
- }
-
- private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken protoToken, InetSocketAddress serviceAddr) {
- Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
- protoToken.getPassword().array(),
- new Text(protoToken.getKind()),
- new Text(protoToken.getService()));
- if (serviceAddr != null) {
- SecurityUtil.setTokenService(token, serviceAddr);
- }
- return token;
- }
-
- @Override
- public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
- this.user = user;
- return createLauncher(twillSpec);
- }
-
- @Override
- public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
- return new ProcessControllerImpl(yarnClient, appId);
- }
-
- @Override
- protected void startUp() throws Exception {
- yarnClient.start();
- }
-
- @Override
- protected void shutDown() throws Exception {
- yarnClient.stop();
- }
-
- private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
- private final YarnClient yarnClient;
- private final ApplicationId appId;
-
- public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
- this.yarnClient = yarnClient;
- this.appId = appId;
- }
-
- @Override
- public YarnApplicationReport getReport() {
- try {
- return new Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
- } catch (YarnRemoteException e) {
- LOG.error("Failed to get application report {}", appId, e);
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void cancel() {
- try {
- yarnClient.killApplication(appId);
- } catch (YarnRemoteException e) {
- LOG.error("Failed to kill application {}", appId, e);
- throw Throwables.propagate(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
deleted file mode 100644
index 6c1b764..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- *
- */
-public final class Hadoop20YarnApplicationReport implements YarnApplicationReport {
-
- private final ApplicationReport report;
-
- public Hadoop20YarnApplicationReport(ApplicationReport report) {
- this.report = report;
- }
-
- @Override
- public ApplicationId getApplicationId() {
- return report.getApplicationId();
- }
-
- @Override
- public ApplicationAttemptId getCurrentApplicationAttemptId() {
- return report.getCurrentApplicationAttemptId();
- }
-
- @Override
- public String getQueue() {
- return report.getQueue();
- }
-
- @Override
- public String getName() {
- return report.getName();
- }
-
- @Override
- public String getHost() {
- return report.getHost();
- }
-
- @Override
- public int getRpcPort() {
- return report.getRpcPort();
- }
-
- @Override
- public YarnApplicationState getYarnApplicationState() {
- return report.getYarnApplicationState();
- }
-
- @Override
- public String getDiagnostics() {
- return report.getDiagnostics();
- }
-
- @Override
- public String getTrackingUrl() {
- return report.getTrackingUrl();
- }
-
- @Override
- public String getOriginalTrackingUrl() {
- return report.getOriginalTrackingUrl();
- }
-
- @Override
- public long getStartTime() {
- return report.getStartTime();
- }
-
- @Override
- public long getFinishTime() {
- return report.getFinishTime();
- }
-
- @Override
- public FinalApplicationStatus getFinalApplicationStatus() {
- return report.getFinalApplicationStatus();
- }
-
- @Override
- public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
- return report.getApplicationResourceUsageReport();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
deleted file mode 100644
index 79b2cb5..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.yarn.api.records.Container;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
-
- private final Container container;
-
- public Hadoop20YarnContainerInfo(Container container) {
- this.container = container;
- }
-
- @Override
- public <T> T getContainer() {
- return (T) container;
- }
-
- @Override
- public String getId() {
- return container.getId().toString();
- }
-
- @Override
- public InetAddress getHost() {
- try {
- return InetAddress.getByName(container.getNodeId().getHost());
- } catch (UnknownHostException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public int getPort() {
- return container.getNodeId().getPort();
- }
-
- @Override
- public int getMemoryMB() {
- return container.getResource().getMemory();
- }
-
- @Override
- public int getVirtualCores() {
- return YarnUtils.getVirtualCores(container.getResource());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
deleted file mode 100644
index cc61856..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
-
- private final ContainerStatus containerStatus;
-
- public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
- this.containerStatus = containerStatus;
- }
-
- @Override
- public String getContainerId() {
- return containerStatus.getContainerId().toString();
- }
-
- @Override
- public ContainerState getState() {
- return containerStatus.getState();
- }
-
- @Override
- public int getExitStatus() {
- return containerStatus.getExitStatus();
- }
-
- @Override
- public String getDiagnostics() {
- return containerStatus.getDiagnostics();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
deleted file mode 100644
index b1f6d66..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
-
- private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
-
- static {
- // Creates transform function from YarnLocalResource -> LocalResource
- RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
- @Override
- public LocalResource apply(YarnLocalResource input) {
- return input.getLocalResource();
- }
- };
- }
-
- private final ContainerLaunchContext launchContext;
-
- public Hadoop20YarnLaunchContext() {
- launchContext = Records.newRecord(ContainerLaunchContext.class);
- }
-
- @Override
- public <T> T getLaunchContext() {
- return (T) launchContext;
- }
-
- @Override
- public void setCredentials(Credentials credentials) {
- launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
- }
-
- @Override
- public void setLocalResources(Map<String, YarnLocalResource> localResources) {
- launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
- }
-
- @Override
- public void setServiceData(Map<String, ByteBuffer> serviceData) {
- launchContext.setServiceData(serviceData);
- }
-
- @Override
- public Map<String, String> getEnvironment() {
- return launchContext.getEnvironment();
- }
-
- @Override
- public void setEnvironment(Map<String, String> environment) {
- launchContext.setEnvironment(environment);
- }
-
- @Override
- public List<String> getCommands() {
- return launchContext.getCommands();
- }
-
- @Override
- public void setCommands(List<String> commands) {
- launchContext.setCommands(commands);
- }
-
- @Override
- public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
- launchContext.setApplicationACLs(acls);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
deleted file mode 100644
index b327b94..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- *
- */
-public final class Hadoop20YarnLocalResource implements YarnLocalResource {
-
- private final LocalResource localResource;
-
- public Hadoop20YarnLocalResource() {
- this.localResource = Records.newRecord(LocalResource.class);
- }
-
- @Override
- public <T> T getLocalResource() {
- return (T) localResource;
- }
-
- @Override
- public URL getResource() {
- return localResource.getResource();
- }
-
- @Override
- public void setResource(URL resource) {
- localResource.setResource(resource);
- }
-
- @Override
- public long getSize() {
- return localResource.getSize();
- }
-
- @Override
- public void setSize(long size) {
- localResource.setSize(size);
- }
-
- @Override
- public long getTimestamp() {
- return localResource.getTimestamp();
- }
-
- @Override
- public void setTimestamp(long timestamp) {
- localResource.setTimestamp(timestamp);
- }
-
- @Override
- public LocalResourceType getType() {
- return localResource.getType();
- }
-
- @Override
- public void setType(LocalResourceType type) {
- localResource.setType(type);
- }
-
- @Override
- public LocalResourceVisibility getVisibility() {
- return localResource.getVisibility();
- }
-
- @Override
- public void setVisibility(LocalResourceVisibility visibility) {
- localResource.setVisibility(visibility);
- }
-
- @Override
- public String getPattern() {
- return localResource.getPattern();
- }
-
- @Override
- public void setPattern(String pattern) {
- localResource.setPattern(pattern);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
deleted file mode 100644
index 98ecc67..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- *
- */
-public final class Hadoop20YarnNMClient implements YarnNMClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
-
- private final YarnRPC yarnRPC;
- private final Configuration yarnConf;
-
- public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
- this.yarnRPC = yarnRPC;
- this.yarnConf = yarnConf;
- }
-
- @Override
- public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
- ContainerLaunchContext context = launchContext.getLaunchContext();
- context.setUser(System.getProperty("user.name"));
-
- Container container = containerInfo.getContainer();
-
- context.setContainerId(container.getId());
- context.setResource(container.getResource());
-
- StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(context);
-
- ContainerManager manager = connectContainerManager(container);
- try {
- manager.startContainer(startRequest);
- return new ContainerTerminator(container, manager);
- } catch (YarnRemoteException e) {
- LOG.error("Error in launching process", e);
- throw Throwables.propagate(e);
- }
-
- }
-
- /**
- * Helper to connect to container manager (node manager).
- */
- private ContainerManager connectContainerManager(Container container) {
- String cmIpPortStr = String.format("%s:%d", container.getNodeId().getHost(), container.getNodeId().getPort());
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConf));
- }
-
- private static final class ContainerTerminator implements Cancellable {
-
- private final Container container;
- private final ContainerManager manager;
-
- private ContainerTerminator(Container container, ContainerManager manager) {
- this.container = container;
- this.manager = manager;
- }
-
- @Override
- public void cancel() {
- LOG.info("Request to stop container {}.", container.getId());
- StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(container.getId());
- try {
- manager.stopContainer(stopRequest);
- boolean completed = false;
- while (!completed) {
- GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
- statusRequest.setContainerId(container.getId());
- GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
- LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
-
- completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
- }
- LOG.info("Container {} stopped.", container.getId());
- } catch (YarnRemoteException e) {
- LOG.error("Fail to stop container {}", container.getId(), e);
- throw Throwables.propagate(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
deleted file mode 100644
index 26b6fa2..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.service.Service;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public interface AMRMClient extends Service {
-
- /**
- * Value used to define no locality.
- */
- static final String ANY = "*";
-
- /**
- * Object to represent container request for resources.
- * Resources may be localized to nodes and racks.
- * Resources may be assigned priorities.
- * Can ask for multiple containers of a given type.
- */
- public static class ContainerRequest {
- Resource capability;
- String[] hosts;
- String[] racks;
- Priority priority;
- int containerCount;
-
- public ContainerRequest(Resource capability, String[] hosts,
- String[] racks, Priority priority, int containerCount) {
- this.capability = capability;
- this.hosts = (hosts != null ? hosts.clone() : null);
- this.racks = (racks != null ? racks.clone() : null);
- this.priority = priority;
- this.containerCount = containerCount;
- }
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Capability[").append(capability).append("]");
- sb.append("Priority[").append(priority).append("]");
- sb.append("ContainerCount[").append(containerCount).append("]");
- return sb.toString();
- }
- }
-
- /**
- * Register the application master. This must be called before any
- * other interaction
- * @param appHostName Name of the host on which master is running
- * @param appHostPort Port master is listening on
- * @param appTrackingUrl URL at which the master info can be seen
- * @return <code>RegisterApplicationMasterResponse</code>
- * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
- */
- public RegisterApplicationMasterResponse
- registerApplicationMaster(String appHostName,
- int appHostPort,
- String appTrackingUrl)
- throws YarnRemoteException;
-
- /**
- * Request additional containers and receive new container allocations.
- * Requests made via <code>addContainerRequest</code> are sent to the
- * <code>ResourceManager</code>. New containers assigned to the master are
- * retrieved. Status of completed containers and node health updates are
- * also retrieved.
- * This also doubles as a heartbeat to the ResourceManager and must be
- * made periodically.
- * The call may not always return any new allocations of containers.
- * App should not make concurrent allocate requests. May cause request loss.
- * @param progressIndicator Indicates progress made by the master
- * @return the response of the allocate request
- * @throws YarnRemoteException
- */
- public AllocationResponse allocate(float progressIndicator)
- throws YarnRemoteException;
-
- /**
- * Unregister the Application Master. This must be called in the end.
- * @param appStatus Success/Failure status of the master
- * @param appMessage Diagnostics message on failure
- * @param appTrackingUrl New URL to get master info
- * @throws YarnRemoteException
- */
- public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
- String appMessage,
- String appTrackingUrl)
- throws YarnRemoteException;
-
- /**
- * Request containers for resources before calling <code>allocate</code>.
- * @param req Resource request
- */
- public void addContainerRequest(ContainerRequest req);
-
- /**
- * Remove previous container request. The previous container request may have
- * already been sent to the ResourceManager. So even after the remove request
- * the app must be prepared to receive an allocation for the previous request
- * even after the remove request
- * @param req Resource request
- */
- public void removeContainerRequest(ContainerRequest req);
-
- /**
- * Release containers assigned by the Resource Manager. If the app cannot use
- * the container or wants to give up the container then it can release it.
- * The app needs to make new requests for the released resource capability if
- * it still needs it. For example, if it released non-local resources
- * @param containerId
- */
- public void releaseAssignedContainer(ContainerId containerId);
-
- /**
- * Get the currently available resources in the cluster.
- * A valid value is available after a call to allocate has been made
- * @return Currently available resources
- */
- public Resource getClusterAvailableResources();
-
- /**
- * Get the current number of nodes in the cluster.
- * A valid values is available after a call to allocate has been made
- * @return Current number of nodes in the cluster
- */
- public int getClusterNodeCount();
-}