You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:35 UTC
[19/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java
----------------------------------------------------------------------
diff --git a/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java b/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java
new file mode 100644
index 0000000..c0aa7ee
--- /dev/null
+++ b/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Unit test for {@link Services} methods.
+ */
+public class ServicesTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServicesTest.class);
+
+ @Test
+ public void testChain() throws ExecutionException, InterruptedException {
+ AtomicBoolean transiting = new AtomicBoolean(false);
+ Service s1 = new DummyService("s1", transiting);
+ Service s2 = new DummyService("s2", transiting);
+ Service s3 = new DummyService("s3", transiting);
+
+ Futures.allAsList(Services.chainStart(s1, s2, s3).get()).get();
+ Futures.allAsList(Services.chainStop(s3, s2, s1).get()).get();
+ }
+
+ @Test
+ public void testCompletion() throws ExecutionException, InterruptedException {
+ Service service = new DummyService("s1", new AtomicBoolean());
+ ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
+
+ service.start();
+ service.stop();
+
+ completion.get();
+
+ AtomicBoolean transiting = new AtomicBoolean();
+ service = new DummyService("s2", transiting);
+ completion = Services.getCompletionFuture(service);
+
+ service.startAndWait();
+ transiting.set(true);
+ service.stop();
+
+ try {
+ completion.get();
+ Assert.assertTrue(false);
+ } catch (ExecutionException e) {
+ // Expected
+ }
+ }
+
+ private static final class DummyService extends AbstractIdleService {
+
+ private final String name;
+ private final AtomicBoolean transiting;
+
+ private DummyService(String name, AtomicBoolean transiting) {
+ this.name = name;
+ this.transiting = transiting;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkState(transiting.compareAndSet(false, true));
+ LOG.info("Starting: " + name);
+ TimeUnit.MILLISECONDS.sleep(500);
+ LOG.info("Started: " + name);
+ Preconditions.checkState(transiting.compareAndSet(true, false));
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ Preconditions.checkState(transiting.compareAndSet(false, true));
+ LOG.info("Stopping: " + name);
+ TimeUnit.MILLISECONDS.sleep(500);
+ LOG.info("Stopped: " + name);
+ Preconditions.checkState(transiting.compareAndSet(true, false));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
new file mode 100644
index 0000000..198f77f
--- /dev/null
+++ b/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ *
+ */
+public class LocalLocationTest {
+
+ @Test
+ public void testDelete() throws IOException {
+ LocationFactory factory = new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir")));
+
+ Location base = factory.create("test").getTempFile(".tmp");
+ Assert.assertTrue(base.mkdirs());
+
+ Assert.assertTrue(base.append("test1").getTempFile(".tmp").createNew());
+ Assert.assertTrue(base.append("test2").getTempFile(".tmp").createNew());
+
+ Location subDir = base.append("test3");
+ Assert.assertTrue(subDir.mkdirs());
+
+ Assert.assertTrue(subDir.append("test4").getTempFile(".tmp").createNew());
+ Assert.assertTrue(subDir.append("test5").getTempFile(".tmp").createNew());
+
+ Assert.assertTrue(base.delete(true));
+ Assert.assertFalse(base.exists());
+ }
+
+ @Test
+ public void testHelper() {
+ LocationFactory factory = LocationFactories.namespace(
+ new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir"))),
+ "testhelper");
+
+ Location location = factory.create("test");
+ Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test"));
+
+ location = factory.create(URI.create("test2"));
+ Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test2"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
new file mode 100644
index 0000000..faff711
--- /dev/null
+++ b/twill-core/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-core</artifactId>
+ <name>Twill core library</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-discovery-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
new file mode 100644
index 0000000..974639d
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.common.Threads;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public abstract class AbstractExecutionServiceController implements ServiceController {
+
+ private final RunId runId;
+ private final ListenerExecutors listenerExecutors;
+ private final Service serviceDelegate;
+
+ protected AbstractExecutionServiceController(RunId runId) {
+ this.runId = runId;
+ this.listenerExecutors = new ListenerExecutors();
+ this.serviceDelegate = new ServiceDelegate();
+ }
+
+ protected abstract void startUp();
+
+ protected abstract void shutDown();
+
+ @Override
+ public final RunId getRunId() {
+ return runId;
+ }
+
+ @Override
+ public final void addListener(Listener listener, Executor executor) {
+ listenerExecutors.addListener(new ListenerExecutor(listener, executor));
+ }
+
+ @Override
+ public final ListenableFuture<State> start() {
+ serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
+ return serviceDelegate.start();
+ }
+
+ @Override
+ public final State startAndWait() {
+ return Futures.getUnchecked(start());
+ }
+
+ @Override
+ public final boolean isRunning() {
+ return serviceDelegate.isRunning();
+ }
+
+ @Override
+ public final State state() {
+ return serviceDelegate.state();
+ }
+
+ @Override
+ public final State stopAndWait() {
+ return Futures.getUnchecked(stop());
+ }
+
+ @Override
+ public final ListenableFuture<State> stop() {
+ return serviceDelegate.stop();
+ }
+
+ protected Executor executor(final State state) {
+ return new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ Thread t = new Thread(command, getClass().getSimpleName() + " " + state);
+ t.setDaemon(true);
+ t.start();
+ }
+ };
+ }
+
+
+ private final class ServiceDelegate extends AbstractIdleService {
+ @Override
+ protected void startUp() throws Exception {
+ AbstractExecutionServiceController.this.startUp();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ AbstractExecutionServiceController.this.shutDown();
+ }
+
+ @Override
+ protected Executor executor(State state) {
+ return AbstractExecutionServiceController.this.executor(state);
+ }
+ }
+
+ /**
+ * Inner class for dispatching listener call back to a list of listeners
+ */
+ private static final class ListenerExecutors implements Listener {
+
+ private interface Callback {
+ void call(Listener listener);
+ }
+
+ private final Queue<ListenerExecutor> listeners = new ConcurrentLinkedQueue<ListenerExecutor>();
+ private final AtomicReference<Callback> lastState = new AtomicReference<Callback>();
+
+ private synchronized void addListener(final ListenerExecutor listener) {
+ listeners.add(listener);
+ Callback callback = lastState.get();
+ if (callback != null) {
+ callback.call(listener);
+ }
+ }
+
+ @Override
+ public synchronized void starting() {
+ lastState.set(new Callback() {
+ @Override
+ public void call(Listener listener) {
+ listener.starting();
+ }
+ });
+ for (ListenerExecutor listener : listeners) {
+ listener.starting();
+ }
+ }
+
+ @Override
+ public synchronized void running() {
+ lastState.set(new Callback() {
+ @Override
+ public void call(Listener listener) {
+ listener.running();
+ }
+ });
+ for (ListenerExecutor listener : listeners) {
+ listener.running();
+ }
+ }
+
+ @Override
+ public synchronized void stopping(final State from) {
+ lastState.set(new Callback() {
+ @Override
+ public void call(Listener listener) {
+ listener.stopping(from);
+ }
+ });
+ for (ListenerExecutor listener : listeners) {
+ listener.stopping(from);
+ }
+ }
+
+ @Override
+ public synchronized void terminated(final State from) {
+ lastState.set(new Callback() {
+ @Override
+ public void call(Listener listener) {
+ listener.terminated(from);
+ }
+ });
+ for (ListenerExecutor listener : listeners) {
+ listener.terminated(from);
+ }
+ }
+
+ @Override
+ public synchronized void failed(final State from, final Throwable failure) {
+ lastState.set(new Callback() {
+ @Override
+ public void call(Listener listener) {
+ listener.failed(from, failure);
+ }
+ });
+ for (ListenerExecutor listener : listeners) {
+ listener.failed(from, failure);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
new file mode 100644
index 0000000..5806f9d
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.json.StackTraceElementCodec;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.logging.LogEntryDecoder;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A abstract base class for {@link org.apache.twill.api.TwillController} implementation that uses Zookeeper to controller a
+ * running twill application.
+ */
+public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
+ private static final int MAX_KAFKA_FETCH_SIZE = 1048576;
+ private static final long SHUTDOWN_TIMEOUT_MS = 2000;
+ private static final long LOG_FETCH_TIMEOUT_MS = 5000;
+
+ private final Queue<LogHandler> logHandlers;
+ private final KafkaClient kafkaClient;
+ private final DiscoveryServiceClient discoveryServiceClient;
+ private final LogPollerThread logPoller;
+
+ public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
+ super(runId, zkClient);
+ this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
+ this.kafkaClient = new SimpleKafkaClient(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
+ this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
+ Iterables.addAll(this.logHandlers, logHandlers);
+ this.logPoller = new LogPollerThread(runId, kafkaClient, logHandlers);
+ }
+
+ @Override
+ protected void doStartUp() {
+ if (!logHandlers.isEmpty()) {
+ logPoller.start();
+ }
+ }
+
+ @Override
+ protected void doShutDown() {
+ logPoller.terminate();
+ try {
+ // Wait for the poller thread to stop.
+ logPoller.join(SHUTDOWN_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("Joining of log poller thread interrupted.", e);
+ }
+ }
+
+ @Override
+ public final synchronized void addLogHandler(LogHandler handler) {
+ logHandlers.add(handler);
+ if (!logPoller.isAlive()) {
+ logPoller.start();
+ }
+ }
+
+ @Override
+ public final Iterable<Discoverable> discoverService(String serviceName) {
+ return discoveryServiceClient.discover(serviceName);
+ }
+
+ @Override
+ public final ListenableFuture<Integer> changeInstances(String runnable, int newCount) {
+ return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
+ }
+
+ private static final class LogPollerThread extends Thread {
+
+ private final KafkaClient kafkaClient;
+ private final Iterable<LogHandler> logHandlers;
+ private volatile boolean running = true;
+
+ LogPollerThread(RunId runId, KafkaClient kafkaClient, Iterable<LogHandler> logHandlers) {
+ super("twill-log-poller-" + runId.getId());
+ setDaemon(true);
+ this.kafkaClient = kafkaClient;
+ this.logHandlers = logHandlers;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Twill log poller thread '{}' started.", getName());
+ kafkaClient.startAndWait();
+ Gson gson = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
+ .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+ .create();
+
+ while (running && !isInterrupted()) {
+ long offset;
+ try {
+ // Get the earliest offset
+ long[] offsets = kafkaClient.getOffset(Constants.LOG_TOPIC, 0, -2, 1).get(LOG_FETCH_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS);
+ // Should have one entry
+ offset = offsets[0];
+ } catch (Throwable t) {
+ // Keep retrying
+ LOG.warn("Failed to fetch offsets from Kafka. Retrying.", t);
+ continue;
+ }
+
+ // Now fetch log messages from Kafka
+ Iterator<FetchedMessage> messageIterator = kafkaClient.consume(Constants.LOG_TOPIC, 0,
+ offset, MAX_KAFKA_FETCH_SIZE);
+ try {
+ while (messageIterator.hasNext()) {
+ String json = Charsets.UTF_8.decode(messageIterator.next().getBuffer()).toString();
+ try {
+ LogEntry entry = gson.fromJson(json, LogEntry.class);
+ if (entry != null) {
+ invokeHandlers(entry);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to decode log entry {}", json, e);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("Exception while fetching log message from Kafka. Retrying.", t);
+ continue;
+ }
+ }
+
+ kafkaClient.stopAndWait();
+ LOG.info("Twill log poller thread stopped.");
+ }
+
+ void terminate() {
+ running = false;
+ interrupt();
+ }
+
+ private void invokeHandlers(LogEntry entry) {
+ for (LogHandler handler : logHandlers) {
+ handler.onLog(entry);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
new file mode 100644
index 0000000..98cc2b8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.json.StackTraceElementCodec;
+import org.apache.twill.internal.json.StateNodeCodec;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.Messages;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.GsonBuilder;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An abstract base class for implementing a {@link ServiceController} using ZooKeeper as a means for
+ * communicating with the remote service. This is designed to work in pair with the {@link ZKServiceDecorator}.
+ */
+public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractZKServiceController.class);
+
+ private final ZKClient zkClient;
+ private final InstanceNodeDataCallback instanceNodeDataCallback;
+ private final StateNodeDataCallback stateNodeDataCallback;
+ private final List<ListenableFuture<?>> messageFutures;
+ private ListenableFuture<State> stopMessageFuture;
+
+ protected AbstractZKServiceController(RunId runId, ZKClient zkClient) {
+ super(runId);
+ this.zkClient = zkClient;
+ this.instanceNodeDataCallback = new InstanceNodeDataCallback();
+ this.stateNodeDataCallback = new StateNodeDataCallback();
+ this.messageFutures = Lists.newLinkedList();
+ }
+
+ @Override
+ public final ListenableFuture<Command> sendCommand(Command command) {
+ return sendMessage(Messages.createForAll(command), command);
+ }
+
+ @Override
+ public final ListenableFuture<Command> sendCommand(String runnableName, Command command) {
+ return sendMessage(Messages.createForRunnable(runnableName, command), command);
+ }
+
+ @Override
+ protected final void startUp() {
+ // Watch for instance node existence.
+ actOnExists(getInstancePath(), new Runnable() {
+ @Override
+ public void run() {
+ watchInstanceNode();
+ }
+ });
+
+ // Watch for state node data
+ actOnExists(getZKPath("state"), new Runnable() {
+ @Override
+ public void run() {
+ watchStateNode();
+ }
+ });
+
+ doStartUp();
+ }
+
+ @Override
+ protected final synchronized void shutDown() {
+ if (stopMessageFuture == null) {
+ stopMessageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(),
+ SystemMessages.stopApplication(), State.TERMINATED);
+ }
+
+ // Cancel all pending message futures.
+ for (ListenableFuture<?> future : messageFutures) {
+ future.cancel(true);
+ }
+
+ doShutDown();
+ }
+
+ /**
+ * Sends a {@link Message} to the remote service. Returns a future that will be completed when the message
+ * has been processed.
+ * @param message The message to send.
+ * @param result Object to set into the future when message is being processed.
+ * @param <V> Type of the result.
+ * @return A {@link ListenableFuture} that will be completed when the message has been processed.
+ */
+ protected final synchronized <V> ListenableFuture<V> sendMessage(Message message, V result) {
+ if (!isRunning()) {
+ return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
+ }
+ final ListenableFuture<V> messageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(), message, result);
+ messageFutures.add(messageFuture);
+ messageFuture.addListener(new Runnable() {
+ @Override
+ public void run() {
+ // If the completion is triggered when stopping, do nothing.
+ if (state() == State.STOPPING) {
+ return;
+ }
+ synchronized (AbstractZKServiceController.this) {
+ messageFutures.remove(messageFuture);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ return messageFuture;
+ }
+
+ protected final ListenableFuture<State> getStopMessageFuture() {
+ return stopMessageFuture;
+ }
+
+ /**
+ * Called during startup. Executed in the startup thread.
+ */
+ protected abstract void doStartUp();
+
+ /**
+ * Called during shutdown. Executed in the shutdown thread.
+ */
+ protected abstract void doShutDown();
+
+ /**
+ * Called when an update on the live instance node is detected.
+ * @param nodeData The updated live instance node data or {@code null} if there is an error when fetching
+ * the node data.
+ */
+ protected abstract void instanceNodeUpdated(NodeData nodeData);
+
+ /**
+ * Called when an update on the state node is detected.
+ * @param stateNode The update state node data or {@code null} if there is an error when fetching the node data.
+ */
+ protected abstract void stateNodeUpdated(StateNode stateNode);
+
+ protected synchronized void forceShutDown() {
+ if (stopMessageFuture == null) {
+ // In force shutdown, don't send message.
+ stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
+ }
+ stop();
+ }
+
+
+ private void actOnExists(final String path, final Runnable action) {
+ // Watch for node existence.
+ final AtomicBoolean nodeExists = new AtomicBoolean(false);
+ Futures.addCallback(zkClient.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // When node is created, call the action.
+ // Other event type would be handled by the action.
+ if (event.getType() == Event.EventType.NodeCreated && nodeExists.compareAndSet(false, true)) {
+ action.run();
+ }
+ }
+ }), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (result != null && nodeExists.compareAndSet(false, true)) {
+ action.run();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed in exists call to {}. Shutting down service.", path, t);
+ forceShutDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ private void watchInstanceNode() {
+ Futures.addCallback(zkClient.getData(getInstancePath(), new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ State state = state();
+ if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
+ // Ignore ZK node events when it is in stopping sequence.
+ return;
+ }
+ switch (event.getType()) {
+ case NodeDataChanged:
+ watchInstanceNode();
+ break;
+ case NodeDeleted:
+ // When the ephemeral node goes away, treat the remote service stopped.
+ forceShutDown();
+ break;
+ default:
+ LOG.info("Ignore ZK event for instance node: {}", event);
+ }
+ }
+ }), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ private void watchStateNode() {
+ Futures.addCallback(zkClient.getData(getZKPath("state"), new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ State state = state();
+ if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
+ // Ignore ZK node events when it is in stopping sequence.
+ return;
+ }
+ switch (event.getType()) {
+ case NodeDataChanged:
+ watchStateNode();
+ break;
+ default:
+ LOG.info("Ignore ZK event for state node: {}", event);
+ }
+ }
+ }), stateNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ /**
+ * Returns the path prefix for creating sequential message node for the remote service.
+ */
+ private String getMessagePrefix() {
+ return getZKPath("messages/msg");
+ }
+
+ /**
+ * Returns the zookeeper node path for the ephemeral instance node for this runId.
+ */
+ private String getInstancePath() {
+ return String.format("/instances/%s", getRunId().getId());
+ }
+
+ private String getZKPath(String path) {
+ return String.format("/%s/%s", getRunId().getId(), path);
+ }
+
+ private final class InstanceNodeDataCallback implements FutureCallback<NodeData> {
+
+ @Override
+ public void onSuccess(NodeData result) {
+ instanceNodeUpdated(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed in fetching instance node data.", t);
+ if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
+ // If the node is gone, treat the remote service stopped.
+ forceShutDown();
+ } else {
+ instanceNodeUpdated(null);
+ }
+ }
+ }
+
+ private final class StateNodeDataCallback implements FutureCallback<NodeData> {
+
+ @Override
+ public void onSuccess(NodeData result) {
+ byte[] data = result.getData();
+ if (data == null) {
+ stateNodeUpdated(null);
+ return;
+ }
+ StateNode stateNode = new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
+ .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+ .create()
+ .fromJson(new String(data, Charsets.UTF_8), StateNode.class);
+
+ stateNodeUpdated(stateNode);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed in fetching state node data.", t);
+ stateNodeUpdated(null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
new file mode 100644
index 0000000..a0e9a71
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.utils.Dependencies;
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+
+/**
+ * This class builds jar files based on class dependencies.
+ */
+public final class ApplicationBundler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class);
+
+ public static final String SUBDIR_CLASSES = "classes/";
+ public static final String SUBDIR_LIB = "lib/";
+ public static final String SUBDIR_RESOURCES = "resources/";
+
+ private final List<String> excludePackages;
+ private final List<String> includePackages;
+ private final Set<String> bootstrapClassPaths;
+ private final CRC32 crc32;
+
+ /**
+ * Constructs a ApplicationBundler.
+ *
+ * @param excludePackages Class packages to exclude
+ */
+ public ApplicationBundler(Iterable<String> excludePackages) {
+ this(excludePackages, ImmutableList.<String>of());
+ }
+
+ /**
+ * Constructs a ApplicationBundler.
+ *
+ * @param excludePackages Class packages to exclude
+ * @param includePackages Class packages that should be included. Anything in this list will override the
+ * one provided in excludePackages.
+ */
+ public ApplicationBundler(Iterable<String> excludePackages, Iterable<String> includePackages) {
+ this.excludePackages = ImmutableList.copyOf(excludePackages);
+ this.includePackages = ImmutableList.copyOf(includePackages);
+
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) {
+ File file = new File(classpath);
+ builder.add(file.getAbsolutePath());
+ try {
+ builder.add(file.getCanonicalPath());
+ } catch (IOException e) {
+ // Ignore the exception and proceed.
+ }
+ }
+ this.bootstrapClassPaths = builder.build();
+ this.crc32 = new CRC32();
+
+ }
+
+ public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
+ createBundle(target, classes, ImmutableList.<URI>of());
+ }
+
+ /**
+ * Same as calling {@link #createBundle(Location, Iterable)}.
+ */
+ public void createBundle(Location target, Class<?> clz, Class<?>...classes) throws IOException {
+ createBundle(target, ImmutableSet.<Class<?>>builder().add(clz).add(classes).build());
+ }
+
+ /**
+ * Creates a jar file which includes all the given classes and all the classes that they depended on.
+ * The jar will also include all classes and resources under the packages as given as include packages
+ * in the constructor.
+ *
+ * @param target Where to save the target jar file.
+ * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
+ * lib/ entry, otherwise under the resources/ entry.
+ * @param classes Set of classes to start the dependency traversal.
+ * @throws IOException
+ */
+ public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
+ LOG.debug("start creating bundle {}. building a temporary file locally at first", target.getName());
+ // Write the jar to local tmp file first
+ File tmpJar = File.createTempFile(target.getName(), ".tmp");
+ try {
+ Set<String> entries = Sets.newHashSet();
+ JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(tmpJar));
+ try {
+ // Find class dependencies
+ findDependencies(classes, entries, jarOut);
+
+ // Add extra resources
+ for (URI resource : resources) {
+ copyResource(resource, entries, jarOut);
+ }
+ } finally {
+ jarOut.close();
+ }
+ LOG.debug("copying temporary bundle to destination {} ({} bytes)", target.toURI(), tmpJar.length());
+ // Copy the tmp jar into destination.
+ OutputStream os = null;
+ try {
+ os = new BufferedOutputStream(target.getOutputStream());
+ Files.copy(tmpJar, os);
+ } catch (IOException e) {
+ throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target.toURI(), e);
+ } finally {
+ if (os != null) {
+ os.close();
+ }
+ }
+ LOG.debug("finished creating bundle at {}", target.toURI());
+ } finally {
+ tmpJar.delete();
+ LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());
+ }
+ }
+
+ private void findDependencies(Iterable<Class<?>> classes, final Set<String> entries,
+ final JarOutputStream jarOut) throws IOException {
+
+ Iterable<String> classNames = Iterables.transform(classes, new Function<Class<?>, String>() {
+ @Override
+ public String apply(Class<?> input) {
+ return input.getName();
+ }
+ });
+
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+ Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
+ @Override
+ public boolean accept(String className, URL classUrl, URL classPathUrl) {
+ if (bootstrapClassPaths.contains(classPathUrl.getFile())) {
+ return false;
+ }
+
+ boolean shouldInclude = false;
+ for (String include : includePackages) {
+ if (className.startsWith(include)) {
+ shouldInclude = true;
+ break;
+ }
+ }
+
+ if (!shouldInclude) {
+ for (String exclude : excludePackages) {
+ if (className.startsWith(exclude)) {
+ return false;
+ }
+ }
+ }
+
+ putEntry(className, classUrl, classPathUrl, entries, jarOut);
+ return true;
+ }
+ }, classNames);
+ }
+
+ private void putEntry(String className, URL classUrl, URL classPathUrl, Set<String> entries, JarOutputStream jarOut) {
+ String classPath = classPathUrl.getFile();
+ if (classPath.endsWith(".jar")) {
+ saveDirEntry(SUBDIR_LIB, entries, jarOut);
+ saveEntry(SUBDIR_LIB + classPath.substring(classPath.lastIndexOf('/') + 1), classPathUrl, entries, jarOut, false);
+ } else {
+ // Class file, put it under the classes directory
+ saveDirEntry(SUBDIR_CLASSES, entries, jarOut);
+ if ("file".equals(classPathUrl.getProtocol())) {
+ // Copy every files under the classPath
+ try {
+ copyDir(new File(classPathUrl.toURI()), SUBDIR_CLASSES, entries, jarOut);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ } else {
+ String entry = SUBDIR_CLASSES + className.replace('.', '/') + ".class";
+ saveDirEntry(entry.substring(0, entry.lastIndexOf('/') + 1), entries, jarOut);
+ saveEntry(entry, classUrl, entries, jarOut, true);
+ }
+ }
+ }
+
+ /**
+ * Saves a directory entry to the jar output.
+ */
+ private void saveDirEntry(String path, Set<String> entries, JarOutputStream jarOut) {
+ if (entries.contains(path)) {
+ return;
+ }
+
+ try {
+ String entry = "";
+ for (String dir : Splitter.on('/').omitEmptyStrings().split(path)) {
+ entry += dir + '/';
+ if (entries.add(entry)) {
+ JarEntry jarEntry = new JarEntry(entry);
+ jarEntry.setMethod(JarOutputStream.STORED);
+ jarEntry.setSize(0L);
+ jarEntry.setCrc(0L);
+ jarOut.putNextEntry(jarEntry);
+ jarOut.closeEntry();
+ }
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Saves a class entry to the jar output.
+ */
+ private void saveEntry(String entry, URL url, Set<String> entries, JarOutputStream jarOut, boolean compress) {
+ LOG.debug("adding bundle entry " + entry);
+ if (!entries.add(entry)) {
+ return;
+ }
+ try {
+ JarEntry jarEntry = new JarEntry(entry);
+ InputStream is = url.openStream();
+
+ try {
+ if (compress) {
+ jarOut.putNextEntry(jarEntry);
+ ByteStreams.copy(is, jarOut);
+ } else {
+ crc32.reset();
+ TransferByteOutputStream os = new TransferByteOutputStream();
+ CheckedOutputStream checkedOut = new CheckedOutputStream(os, crc32);
+ ByteStreams.copy(is, checkedOut);
+ checkedOut.close();
+
+ long size = os.size();
+ jarEntry.setMethod(JarEntry.STORED);
+ jarEntry.setSize(size);
+ jarEntry.setCrc(checkedOut.getChecksum().getValue());
+ jarOut.putNextEntry(jarEntry);
+ os.transfer(jarOut);
+ }
+ } finally {
+ is.close();
+ }
+ jarOut.closeEntry();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+
+ /**
+ * Copies all entries under the file path.
+ */
+ private void copyDir(File baseDir, String entryPrefix,
+ Set<String> entries, JarOutputStream jarOut) throws IOException {
+ LOG.debug("adding whole dir {} to bundle at '{}'", baseDir, entryPrefix);
+ URI baseUri = baseDir.toURI();
+ Queue<File> queue = Lists.newLinkedList();
+ Collections.addAll(queue, baseDir.listFiles());
+ while (!queue.isEmpty()) {
+ File file = queue.remove();
+
+ String entry = entryPrefix + baseUri.relativize(file.toURI()).getPath();
+ if (entries.add(entry)) {
+ jarOut.putNextEntry(new JarEntry(entry));
+ if (file.isFile()) {
+ try {
+ Files.copy(file, jarOut);
+ } catch (IOException e) {
+ throw new IOException("failure copying from " + file.getAbsoluteFile() + " to JAR file entry " + entry, e);
+ }
+ }
+ jarOut.closeEntry();
+ }
+
+ if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ if (files != null) {
+ queue.addAll(Arrays.asList(files));
+ }
+ }
+ }
+ }
+
+ private void copyResource(URI resource, Set<String> entries, JarOutputStream jarOut) throws IOException {
+ if ("file".equals(resource.getScheme())) {
+ File file = new File(resource);
+ if (file.isDirectory()) {
+ saveDirEntry(SUBDIR_RESOURCES, entries, jarOut);
+ copyDir(file, SUBDIR_RESOURCES, entries, jarOut);
+ return;
+ }
+ }
+
+ URL url = resource.toURL();
+ String path = url.getFile();
+ String prefix = path.endsWith(".jar") ? SUBDIR_LIB : SUBDIR_RESOURCES;
+ path = prefix + path.substring(path.lastIndexOf('/') + 1);
+
+ saveDirEntry(prefix, entries, jarOut);
+ jarOut.putNextEntry(new JarEntry(path));
+ InputStream is = url.openStream();
+ try {
+ ByteStreams.copy(is, jarOut);
+ } finally {
+ is.close();
+ }
+ }
+
+ private static final class TransferByteOutputStream extends ByteArrayOutputStream {
+
+ public void transfer(OutputStream os) throws IOException {
+ os.write(buf, 0, count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/Arguments.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Arguments.java b/twill-core/src/main/java/org/apache/twill/internal/Arguments.java
new file mode 100644
index 0000000..a78547c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Arguments.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.List;
+
+/**
+ * Class that encapsulate application arguments and per runnable arguments.
+ */
+public final class Arguments {
+
+ private final List<String> arguments;
+ private final Multimap<String, String> runnableArguments;
+
+ public Arguments(List<String> arguments, Multimap<String, String> runnableArguments) {
+ this.arguments = ImmutableList.copyOf(arguments);
+ this.runnableArguments = ImmutableMultimap.copyOf(runnableArguments);
+ }
+
+ public List<String> getArguments() {
+ return arguments;
+ }
+
+ public Multimap<String, String> getRunnableArguments() {
+ return runnableArguments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
new file mode 100644
index 0000000..61bdaef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryService;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class BasicTwillContext implements TwillContext {
+
+ private final RunId runId;
+ private final RunId appRunId;
+ private final InetAddress host;
+ private final String[] args;
+ private final String[] appArgs;
+ private final TwillRunnableSpecification spec;
+ private final int instanceId;
+ private final DiscoveryService discoveryService;
+ private final int allowedMemoryMB;
+ private final int virtualCores;
+ private volatile int instanceCount;
+
+ public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
+ TwillRunnableSpecification spec, int instanceId, DiscoveryService discoveryService,
+ int instanceCount, int allowedMemoryMB, int virtualCores) {
+ this.runId = runId;
+ this.appRunId = appRunId;
+ this.host = host;
+ this.args = args;
+ this.appArgs = appArgs;
+ this.spec = spec;
+ this.instanceId = instanceId;
+ this.discoveryService = discoveryService;
+ this.instanceCount = instanceCount;
+ this.allowedMemoryMB = allowedMemoryMB;
+ this.virtualCores = virtualCores;
+ }
+
+ @Override
+ public RunId getRunId() {
+ return runId;
+ }
+
+ @Override
+ public RunId getApplicationRunId() {
+ return appRunId;
+ }
+
+ @Override
+ public int getInstanceCount() {
+ return instanceCount;
+ }
+
+ public void setInstanceCount(int count) {
+ this.instanceCount = count;
+ }
+
+ @Override
+ public InetAddress getHost() {
+ return host;
+ }
+
+ @Override
+ public String[] getArguments() {
+ return args;
+ }
+
+ @Override
+ public String[] getApplicationArguments() {
+ return appArgs;
+ }
+
+ @Override
+ public TwillRunnableSpecification getSpecification() {
+ return spec;
+ }
+
+ @Override
+ public int getInstanceId() {
+ return instanceId;
+ }
+
+ @Override
+ public int getVirtualCores() {
+ return virtualCores;
+ }
+
+ @Override
+ public int getMaxMemoryMB() {
+ return allowedMemoryMB;
+ }
+
+ @Override
+ public Cancellable announce(final String serviceName, final int port) {
+ return discoveryService.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return serviceName;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(getHost(), port);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/Configs.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Configs.java b/twill-core/src/main/java/org/apache/twill/internal/Configs.java
new file mode 100644
index 0000000..0fa1df8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Configs.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ *
+ */
+public final class Configs {
+
+ public static final class Keys {
+ /**
+ * Size in MB of reserved memory for Java process (non-heap memory).
+ */
+ public static final String JAVA_RESERVED_MEMORY_MB = "twill.java.reserved.memory.mb";
+
+ private Keys() {
+ }
+ }
+
+ public static final class Defaults {
+ // By default have 200MB reserved for Java process.
+ public static final int JAVA_RESERVED_MEMORY_MB = 200;
+
+ private Defaults() {
+ }
+ }
+
+ private Configs() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..0387d3e
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+ public static final String LOG_TOPIC = "log";
+
+ /** Maximum number of seconds for AM to start. */
+ public static final int APPLICATION_MAX_START_SECONDS = 60;
+ /** Maximum number of seconds for AM to stop. */
+ public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+ public static final long PROVISION_TIMEOUT = 30000;
+
+ /** Memory size of AM */
+ public static final int APP_MASTER_MEMORY_MB = 512;
+
+ public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+ public static final String STDOUT = "stdout";
+ public static final String STDERR = "stderr";
+
+ /**
+ * Constants for names of internal files that are shared between client, AM and containers.
+ */
+ public static final class Files {
+
+ public static final String LAUNCHER_JAR = "launcher.jar";
+ public static final String APP_MASTER_JAR = "appMaster.jar";
+ public static final String CONTAINER_JAR = "container.jar";
+ public static final String LOCALIZE_FILES = "localizeFiles.json";
+ public static final String TWILL_SPEC = "twillSpec.json";
+ public static final String ARGUMENTS = "arguments.json";
+ public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+ public static final String KAFKA = "kafka.tgz";
+ public static final String JVM_OPTIONS = "jvm.opts";
+ public static final String CREDENTIALS = "credentials.store";
+
+ private Files() {
+ }
+ }
+
+ private Constants() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
new file mode 100644
index 0000000..67c21d3
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import java.net.InetAddress;
+
+/**
+ * Represents information of the container that the processing is/will be running in.
+ */
+public interface ContainerInfo {
+
+ String getId();
+
+ InetAddress getHost();
+
+ int getPort();
+
+ int getMemoryMB();
+
+ int getVirtualCores();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
new file mode 100644
index 0000000..705943c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ *
+ */
+public final class ContainerLiveNodeData {
+
+ private final String containerId;
+ private final String host;
+
+ public ContainerLiveNodeData(String containerId, String host) {
+ this.containerId = containerId;
+ this.host = host;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
new file mode 100644
index 0000000..fd50028
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A {@link ContainerInfo} based on information on the environment.
+ */
+public final class EnvContainerInfo implements ContainerInfo {
+ private final String id;
+ private final InetAddress host;
+ private final int port;
+ private final int virtualCores;
+ private final int memoryMB;
+
+ public EnvContainerInfo() throws UnknownHostException {
+ id = System.getenv(EnvKeys.YARN_CONTAINER_ID);
+ host = InetAddress.getByName(System.getenv(EnvKeys.YARN_CONTAINER_HOST));
+ port = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_PORT));
+ virtualCores = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES));
+ memoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public InetAddress getHost() {
+ return host;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public int getMemoryMB() {
+ return memoryMB;
+ }
+
+ @Override
+ public int getVirtualCores() {
+ return virtualCores;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
new file mode 100644
index 0000000..9bf6523
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * Places for define common environment keys.
+ */
+public final class EnvKeys {
+
+ public static final String TWILL_ZK_CONNECT = "TWILL_ZK_CONNECT";
+ public static final String TWILL_APP_RUN_ID = "TWILL_APP_RUN_ID";
+ public static final String TWILL_RUN_ID = "TWILL_RUN_ID";
+ public static final String TWILL_INSTANCE_ID = "TWILL_INSTANCE_ID";
+ public static final String TWILL_INSTANCE_COUNT = "TWILL_INSTANCE_COUNT";
+ public static final String TWILL_RESERVED_MEMORY_MB = "TWILL_RESERVED_MEMORY_MB";
+
+ public static final String TWILL_FS_USER = "TWILL_FS_USER";
+
+ /**
+ * Cluster filesystem directory for storing twill app related files.
+ */
+ public static final String TWILL_APP_DIR = "TWILL_APP_DIR";
+
+ public static final String TWILL_APP_NAME = "TWILL_APP_NAME";
+ public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
+
+ public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
+
+ public static final String YARN_APP_ID = "YARN_APP_ID";
+ public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME";
+ public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";
+
+ public static final String YARN_CONTAINER_ID = "YARN_CONTAINER_ID";
+ public static final String YARN_CONTAINER_HOST = "YARN_CONTAINER_HOST";
+ public static final String YARN_CONTAINER_PORT = "YARN_CONTAINER_PORT";
+ /**
+ * Used to inform runnables of their resource usage.
+ */
+ public static final String YARN_CONTAINER_VIRTUAL_CORES = "YARN_CONTAINER_VIRTUAL_CORES";
+ public static final String YARN_CONTAINER_MEMORY_MB = "YARN_CONTAINER_MEMORY_MB";
+
+ private EnvKeys() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
new file mode 100644
index 0000000..9d3e156
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
+ * Also make sure each method is called at most once.
+ */
+final class ListenerExecutor implements Service.Listener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);
+
+ private final Service.Listener delegate;
+ private final Executor executor;
+ private final ConcurrentMap<Service.State, Boolean> callStates = Maps.newConcurrentMap();
+
+ ListenerExecutor(Service.Listener delegate, Executor executor) {
+ this.delegate = delegate;
+ this.executor = executor;
+ }
+
+ @Override
+ public void starting() {
+ if (hasCalled(Service.State.STARTING)) {
+ return;
+ }
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ delegate.starting();
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown from listener", t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void running() {
+ if (hasCalled(Service.State.RUNNING)) {
+ return;
+ }
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ delegate.running();
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown from listener", t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void stopping(final Service.State from) {
+ if (hasCalled(Service.State.STOPPING)) {
+ return;
+ }
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ delegate.stopping(from);
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown from listener", t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void terminated(final Service.State from) {
+ if (hasCalled(Service.State.TERMINATED)) {
+ return;
+ }
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ delegate.terminated(from);
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown from listener", t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void failed(final Service.State from, final Throwable failure) {
+ // Both failed and terminate are using the same state for checking as only either one could be called.
+ if (hasCalled(Service.State.TERMINATED)) {
+ return;
+ }
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ delegate.failed(from, failure);
+ } catch (Throwable t) {
+ LOG.warn("Exception thrown from listener", t);
+ }
+ }
+ });
+ }
+
+ private boolean hasCalled(Service.State state) {
+ return callStates.putIfAbsent(state, true) != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java b/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
new file mode 100644
index 0000000..4f71a05
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class LogOnlyEventHandler extends EventHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogOnlyEventHandler.class);
+
+ @Override
+ public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
+ for (TimeoutEvent event : timeoutEvents) {
+ LOG.info("Requested {} containers for runnable {}, only got {} after {} ms.",
+ event.getExpectedInstances(), event.getRunnableName(),
+ event.getActualInstances(), System.currentTimeMillis() - event.getRequestTime());
+ }
+ return TimeoutAction.recheck(Constants.PROVISION_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java
new file mode 100644
index 0000000..4453838
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * For controlling a launch yarn process.
+ *
+ * @param <R> Report type.
+ */
+public interface ProcessController<R> extends Cancellable {
+
+ R getReport();
+
+ /**
+ * Request to stop the running process.
+ */
+ void cancel();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
new file mode 100644
index 0000000..e48a226
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.LocalFile;
+
+import java.util.Map;
+
+/**
+ * Class for launching container process.
+ *
+ * @param <T> Type of the object that contains information about the container that the process is going to launch.
+ */
+public interface ProcessLauncher<T> {
+
+ /**
+ * Returns information about the container that this launch would launch process in.
+ */
+ T getContainerInfo();
+
+ /**
+ * Returns a preparer with the given default set of environments, resources and credentials.
+ */
+ <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
+ Iterable<LocalFile> resources, C credentials);
+
+ /**
+ * For setting up the launcher.
+ */
+ interface PrepareLaunchContext {
+
+ ResourcesAdder withResources();
+
+ AfterResources noResources();
+
+ interface ResourcesAdder {
+ MoreResources add(LocalFile localFile);
+ }
+
+ interface AfterResources {
+ EnvironmentAdder withEnvironment();
+
+ AfterEnvironment noEnvironment();
+ }
+
+ interface EnvironmentAdder {
+ <V> MoreEnvironment add(String key, V value);
+ }
+
+ interface MoreEnvironment extends EnvironmentAdder, AfterEnvironment {
+ }
+
+ interface AfterEnvironment {
+ CommandAdder withCommands();
+ }
+
+ interface MoreResources extends ResourcesAdder, AfterResources { }
+
+ interface CommandAdder {
+ StdOutSetter add(String cmd, String...args);
+ }
+
+ interface StdOutSetter {
+ StdErrSetter redirectOutput(String stdout);
+
+ StdErrSetter noOutput();
+ }
+
+ interface StdErrSetter {
+ MoreCommand redirectError(String stderr);
+
+ MoreCommand noError();
+ }
+
+ interface MoreCommand extends CommandAdder {
+ <R> ProcessController<R> launch();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java b/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
new file mode 100644
index 0000000..a52afe1
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+
+/**
+ * A simple {@link org.apache.twill.api.TwillApplication} that contains only one {@link org.apache.twill.api.TwillRunnable}.
+ */
+public class SingleRunnableApplication implements TwillApplication {
+
+ private final TwillRunnable runnable;
+ private final ResourceSpecification resourceSpec;
+
+ public SingleRunnableApplication(TwillRunnable runnable, ResourceSpecification resourceSpec) {
+ this.runnable = runnable;
+ this.resourceSpec = resourceSpec;
+ }
+
+ @Override
+ public TwillSpecification configure() {
+ TwillRunnableSpecification runnableSpec = runnable.configure();
+ return TwillSpecification.Builder.with()
+ .setName(runnableSpec.getName())
+ .withRunnable().add(runnableSpec.getName(), runnable, resourceSpec)
+ .noLocalFiles()
+ .anyOrder()
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
new file mode 100644
index 0000000..8b090bd
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.internal.state.Message;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A {@link ServiceController} that allows sending a message directly. Internal use only.
+ */
+public interface TwillContainerController extends ServiceController {
+
+ ListenableFuture<Message> sendMessage(Message message);
+
+ /**
+ * Calls to indicated that the container that this controller is associated with is completed.
+ * Any resources it hold will be releases and all pending futures will be cancelled.
+ */
+ void completed(int exitStatus);
+}