You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:42 UTC
[26/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/common/src/main/java/org/apache/twill/filesystem/Location.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/twill/filesystem/Location.java b/common/src/main/java/org/apache/twill/filesystem/Location.java
deleted file mode 100644
index dee9546..0000000
--- a/common/src/main/java/org/apache/twill/filesystem/Location.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-
-/**
- * This interface defines the location and operations of a resource on the filesystem.
- * <p>
- * {@link Location} is agnostic to the type of file system the resource is on.
- * </p>
- */
-public interface Location {
- /**
- * Suffix added to every temp file name generated with {@link #getTempFile(String)}.
- */
- static final String TEMP_FILE_SUFFIX = ".tmp";
-
- /**
- * Checks if the this location exists.
- *
- * @return true if found; false otherwise.
- * @throws IOException
- */
- boolean exists() throws IOException;
-
- /**
- * @return Returns the name of the file or directory denoteed by this abstract pathname.
- */
- String getName();
-
- /**
- * Atomically creates a new, empty file named by this abstract pathname if and only if a file with this name
- * does not yet exist.
- * @return {@code true} if the file is successfully create, {@code false} otherwise.
- * @throws IOException
- */
- boolean createNew() throws IOException;
-
- /**
- * @return An {@link java.io.InputStream} for this location.
- * @throws IOException
- */
- InputStream getInputStream() throws IOException;
-
- /**
- * @return An {@link java.io.OutputStream} for this location.
- * @throws IOException
- */
- OutputStream getOutputStream() throws IOException;
-
- /**
- * Creates an {@link OutputStream} for this location with the given permission. The actual permission supported
- * depends on implementation.
- *
- * @param permission A POSIX permission string.
- * @return An {@link OutputStream} for writing to this location.
- * @throws IOException If failed to create the {@link OutputStream}.
- */
- OutputStream getOutputStream(String permission) throws IOException;
-
- /**
- * Appends the child to the current {@link Location}.
- * <p>
- * Returns a new instance of Location.
- * </p>
- *
- * @param child to be appended to this location.
- * @return A new instance of {@link Location}
- * @throws IOException
- */
- Location append(String child) throws IOException;
-
- /**
- * Returns unique location for temporary file to be placed near this location.
- * Allows all temp files to follow same pattern for easier management of them.
- * @param suffix part of the file name to include in the temp file name
- * @return location of the temp file
- * @throws IOException
- */
- Location getTempFile(String suffix) throws IOException;
-
- /**
- * @return A {@link java.net.URI} for this location.
- */
- URI toURI();
-
- /**
- * Deletes the file or directory denoted by this abstract pathname. If this
- * pathname denotes a directory, then the directory must be empty in order
- * to be deleted.
- *
- * @return true if and only if the file or directory is successfully delete; false otherwise.
- */
- boolean delete() throws IOException;
-
- /**
- * Deletes the file or directory denoted by this abstract pathname. If this
- * pathname denotes a directory and {@code recursive} is {@code true}, then content of the
- * directory will be deleted recursively, otherwise the directory must be empty in order to be deleted.
- * Note that when calling this method with {@code recursive = true} for a directory, any
- * failure during deletion will have some entries inside the directory being deleted while some are not.
- *
- * @param recursive Indicate if recursively delete a directory. Ignored if the pathname represents a file.
- * @return true if and only if the file or directory is successfully delete; false otherwise.
- */
- boolean delete(boolean recursive) throws IOException;
-
- /**
- * Moves the file or directory denoted by this abstract pathname.
- *
- * @param destination destination location
- * @return new location if and only if the file or directory is successfully moved; null otherwise.
- */
- @Nullable
- Location renameTo(Location destination) throws IOException;
-
- /**
- * Creates the directory named by this abstract pathname, including any necessary
- * but nonexistent parent directories.
- *
- * @return true if and only if the renaming succeeded; false otherwise
- */
- boolean mkdirs() throws IOException;
-
- /**
- * @return Length of file.
- */
- long length() throws IOException;
-
- /**
- * @return Last modified time of file.
- */
- long lastModified() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java b/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
deleted file mode 100644
index 751a632..0000000
--- a/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import com.google.common.base.Throwables;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * Providers helper methods for creating different {@link LocationFactory}.
- */
-public final class LocationFactories {
-
- /**
- * Creates a {@link LocationFactory} that always applies the giving namespace prefix.
- */
- public static LocationFactory namespace(LocationFactory delegate, final String namespace) {
- return new ForwardingLocationFactory(delegate) {
- @Override
- public Location create(String path) {
- try {
- Location base = getDelegate().create(namespace);
- return base.append(path);
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public Location create(URI uri) {
- if (uri.isAbsolute()) {
- return getDelegate().create(uri);
- }
- try {
- Location base = getDelegate().create(namespace);
- return base.append(uri.getPath());
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public Location getHomeLocation() {
- return getDelegate().getHomeLocation();
- }
- };
- }
-
- private LocationFactories() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java b/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java
deleted file mode 100644
index f88d94d..0000000
--- a/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import java.net.URI;
-
-/**
- * Factory for creating instance of {@link Location}.
- */
-public interface LocationFactory {
-
- /**
- * Creates an instance of {@link Location} of the given path.
- * @param path The path representing the location.
- * @return An instance of {@link Location}.
- */
- Location create(String path);
-
- /**
- * Creates an instance of {@link Location} based on {@link java.net.URI} <code>uri</code>.
- *
- * @param uri to the resource on the filesystem.
- * @return An instance of {@link Location}
- */
- Location create(URI uri);
-
- /**
- * Returns the home location.
- */
- Location getHomeLocation();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/common/src/test/java/org/apache/twill/common/ServicesTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/twill/common/ServicesTest.java b/common/src/test/java/org/apache/twill/common/ServicesTest.java
deleted file mode 100644
index c0aa7ee..0000000
--- a/common/src/test/java/org/apache/twill/common/ServicesTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.common;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Unit test for {@link Services} methods.
- */
-public class ServicesTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(ServicesTest.class);
-
- @Test
- public void testChain() throws ExecutionException, InterruptedException {
- AtomicBoolean transiting = new AtomicBoolean(false);
- Service s1 = new DummyService("s1", transiting);
- Service s2 = new DummyService("s2", transiting);
- Service s3 = new DummyService("s3", transiting);
-
- Futures.allAsList(Services.chainStart(s1, s2, s3).get()).get();
- Futures.allAsList(Services.chainStop(s3, s2, s1).get()).get();
- }
-
- @Test
- public void testCompletion() throws ExecutionException, InterruptedException {
- Service service = new DummyService("s1", new AtomicBoolean());
- ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
-
- service.start();
- service.stop();
-
- completion.get();
-
- AtomicBoolean transiting = new AtomicBoolean();
- service = new DummyService("s2", transiting);
- completion = Services.getCompletionFuture(service);
-
- service.startAndWait();
- transiting.set(true);
- service.stop();
-
- try {
- completion.get();
- Assert.assertTrue(false);
- } catch (ExecutionException e) {
- // Expected
- }
- }
-
- private static final class DummyService extends AbstractIdleService {
-
- private final String name;
- private final AtomicBoolean transiting;
-
- private DummyService(String name, AtomicBoolean transiting) {
- this.name = name;
- this.transiting = transiting;
- }
-
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkState(transiting.compareAndSet(false, true));
- LOG.info("Starting: " + name);
- TimeUnit.MILLISECONDS.sleep(500);
- LOG.info("Started: " + name);
- Preconditions.checkState(transiting.compareAndSet(true, false));
- }
-
- @Override
- protected void shutDown() throws Exception {
- Preconditions.checkState(transiting.compareAndSet(false, true));
- LOG.info("Stopping: " + name);
- TimeUnit.MILLISECONDS.sleep(500);
- LOG.info("Stopped: " + name);
- Preconditions.checkState(transiting.compareAndSet(true, false));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
deleted file mode 100644
index 198f77f..0000000
--- a/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-
-/**
- *
- */
-public class LocalLocationTest {
-
- @Test
- public void testDelete() throws IOException {
- LocationFactory factory = new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir")));
-
- Location base = factory.create("test").getTempFile(".tmp");
- Assert.assertTrue(base.mkdirs());
-
- Assert.assertTrue(base.append("test1").getTempFile(".tmp").createNew());
- Assert.assertTrue(base.append("test2").getTempFile(".tmp").createNew());
-
- Location subDir = base.append("test3");
- Assert.assertTrue(subDir.mkdirs());
-
- Assert.assertTrue(subDir.append("test4").getTempFile(".tmp").createNew());
- Assert.assertTrue(subDir.append("test5").getTempFile(".tmp").createNew());
-
- Assert.assertTrue(base.delete(true));
- Assert.assertFalse(base.exists());
- }
-
- @Test
- public void testHelper() {
- LocationFactory factory = LocationFactories.namespace(
- new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir"))),
- "testhelper");
-
- Location location = factory.create("test");
- Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test"));
-
- location = factory.create(URI.create("test2"));
- Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test2"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
deleted file mode 100644
index faff711..0000000
--- a/core/pom.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>twill-parent</artifactId>
- <groupId>org.apache.twill</groupId>
- <version>0.1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>twill-core</artifactId>
- <name>Twill core library</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-zookeeper</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-discovery-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </dependency>
- <dependency>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm-all</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
deleted file mode 100644
index 974639d..0000000
--- a/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.Threads;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public abstract class AbstractExecutionServiceController implements ServiceController {
-
- private final RunId runId;
- private final ListenerExecutors listenerExecutors;
- private final Service serviceDelegate;
-
- protected AbstractExecutionServiceController(RunId runId) {
- this.runId = runId;
- this.listenerExecutors = new ListenerExecutors();
- this.serviceDelegate = new ServiceDelegate();
- }
-
- protected abstract void startUp();
-
- protected abstract void shutDown();
-
- @Override
- public final RunId getRunId() {
- return runId;
- }
-
- @Override
- public final void addListener(Listener listener, Executor executor) {
- listenerExecutors.addListener(new ListenerExecutor(listener, executor));
- }
-
- @Override
- public final ListenableFuture<State> start() {
- serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
- return serviceDelegate.start();
- }
-
- @Override
- public final State startAndWait() {
- return Futures.getUnchecked(start());
- }
-
- @Override
- public final boolean isRunning() {
- return serviceDelegate.isRunning();
- }
-
- @Override
- public final State state() {
- return serviceDelegate.state();
- }
-
- @Override
- public final State stopAndWait() {
- return Futures.getUnchecked(stop());
- }
-
- @Override
- public final ListenableFuture<State> stop() {
- return serviceDelegate.stop();
- }
-
- protected Executor executor(final State state) {
- return new Executor() {
- @Override
- public void execute(Runnable command) {
- Thread t = new Thread(command, getClass().getSimpleName() + " " + state);
- t.setDaemon(true);
- t.start();
- }
- };
- }
-
-
- private final class ServiceDelegate extends AbstractIdleService {
- @Override
- protected void startUp() throws Exception {
- AbstractExecutionServiceController.this.startUp();
- }
-
- @Override
- protected void shutDown() throws Exception {
- AbstractExecutionServiceController.this.shutDown();
- }
-
- @Override
- protected Executor executor(State state) {
- return AbstractExecutionServiceController.this.executor(state);
- }
- }
-
- /**
- * Inner class for dispatching listener call back to a list of listeners
- */
- private static final class ListenerExecutors implements Listener {
-
- private interface Callback {
- void call(Listener listener);
- }
-
- private final Queue<ListenerExecutor> listeners = new ConcurrentLinkedQueue<ListenerExecutor>();
- private final AtomicReference<Callback> lastState = new AtomicReference<Callback>();
-
- private synchronized void addListener(final ListenerExecutor listener) {
- listeners.add(listener);
- Callback callback = lastState.get();
- if (callback != null) {
- callback.call(listener);
- }
- }
-
- @Override
- public synchronized void starting() {
- lastState.set(new Callback() {
- @Override
- public void call(Listener listener) {
- listener.starting();
- }
- });
- for (ListenerExecutor listener : listeners) {
- listener.starting();
- }
- }
-
- @Override
- public synchronized void running() {
- lastState.set(new Callback() {
- @Override
- public void call(Listener listener) {
- listener.running();
- }
- });
- for (ListenerExecutor listener : listeners) {
- listener.running();
- }
- }
-
- @Override
- public synchronized void stopping(final State from) {
- lastState.set(new Callback() {
- @Override
- public void call(Listener listener) {
- listener.stopping(from);
- }
- });
- for (ListenerExecutor listener : listeners) {
- listener.stopping(from);
- }
- }
-
- @Override
- public synchronized void terminated(final State from) {
- lastState.set(new Callback() {
- @Override
- public void call(Listener listener) {
- listener.terminated(from);
- }
- });
- for (ListenerExecutor listener : listeners) {
- listener.terminated(from);
- }
- }
-
- @Override
- public synchronized void failed(final State from, final Throwable failure) {
- lastState.set(new Callback() {
- @Override
- public void call(Listener listener) {
- listener.failed(from, failure);
- }
- });
- for (ListenerExecutor listener : listeners) {
- listener.failed(from, failure);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
deleted file mode 100644
index 5806f9d..0000000
--- a/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.logging.LogEntry;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
-import org.apache.twill.internal.logging.LogEntryDecoder;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.kafka.client.FetchedMessage;
-import org.apache.twill.kafka.client.KafkaClient;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A abstract base class for {@link org.apache.twill.api.TwillController} implementation that uses Zookeeper to controller a
- * running twill application.
- */
-public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
- private static final int MAX_KAFKA_FETCH_SIZE = 1048576;
- private static final long SHUTDOWN_TIMEOUT_MS = 2000;
- private static final long LOG_FETCH_TIMEOUT_MS = 5000;
-
- private final Queue<LogHandler> logHandlers;
- private final KafkaClient kafkaClient;
- private final DiscoveryServiceClient discoveryServiceClient;
- private final LogPollerThread logPoller;
-
- public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
- super(runId, zkClient);
- this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
- this.kafkaClient = new SimpleKafkaClient(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
- this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
- Iterables.addAll(this.logHandlers, logHandlers);
- this.logPoller = new LogPollerThread(runId, kafkaClient, logHandlers);
- }
-
- @Override
- protected void doStartUp() {
- if (!logHandlers.isEmpty()) {
- logPoller.start();
- }
- }
-
- @Override
- protected void doShutDown() {
- logPoller.terminate();
- try {
- // Wait for the poller thread to stop.
- logPoller.join(SHUTDOWN_TIMEOUT_MS);
- } catch (InterruptedException e) {
- LOG.warn("Joining of log poller thread interrupted.", e);
- }
- }
-
- @Override
- public final synchronized void addLogHandler(LogHandler handler) {
- logHandlers.add(handler);
- if (!logPoller.isAlive()) {
- logPoller.start();
- }
- }
-
- @Override
- public final Iterable<Discoverable> discoverService(String serviceName) {
- return discoveryServiceClient.discover(serviceName);
- }
-
- @Override
- public final ListenableFuture<Integer> changeInstances(String runnable, int newCount) {
- return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
- }
-
- private static final class LogPollerThread extends Thread {
-
- private final KafkaClient kafkaClient;
- private final Iterable<LogHandler> logHandlers;
- private volatile boolean running = true;
-
- LogPollerThread(RunId runId, KafkaClient kafkaClient, Iterable<LogHandler> logHandlers) {
- super("twill-log-poller-" + runId.getId());
- setDaemon(true);
- this.kafkaClient = kafkaClient;
- this.logHandlers = logHandlers;
- }
-
- @Override
- public void run() {
- LOG.info("Twill log poller thread '{}' started.", getName());
- kafkaClient.startAndWait();
- Gson gson = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
- .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
- .create();
-
- while (running && !isInterrupted()) {
- long offset;
- try {
- // Get the earliest offset
- long[] offsets = kafkaClient.getOffset(Constants.LOG_TOPIC, 0, -2, 1).get(LOG_FETCH_TIMEOUT_MS,
- TimeUnit.MILLISECONDS);
- // Should have one entry
- offset = offsets[0];
- } catch (Throwable t) {
- // Keep retrying
- LOG.warn("Failed to fetch offsets from Kafka. Retrying.", t);
- continue;
- }
-
- // Now fetch log messages from Kafka
- Iterator<FetchedMessage> messageIterator = kafkaClient.consume(Constants.LOG_TOPIC, 0,
- offset, MAX_KAFKA_FETCH_SIZE);
- try {
- while (messageIterator.hasNext()) {
- String json = Charsets.UTF_8.decode(messageIterator.next().getBuffer()).toString();
- try {
- LogEntry entry = gson.fromJson(json, LogEntry.class);
- if (entry != null) {
- invokeHandlers(entry);
- }
- } catch (Exception e) {
- LOG.error("Failed to decode log entry {}", json, e);
- }
- }
- } catch (Throwable t) {
- LOG.warn("Exception while fetching log message from Kafka. Retrying.", t);
- continue;
- }
- }
-
- kafkaClient.stopAndWait();
- LOG.info("Twill log poller thread stopped.");
- }
-
- void terminate() {
- running = false;
- interrupt();
- }
-
- private void invokeHandlers(LogEntry entry) {
- for (LogHandler handler : logHandlers) {
- handler.onLog(entry);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
deleted file mode 100644
index 98cc2b8..0000000
--- a/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.json.StateNodeCodec;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.Messages;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.GsonBuilder;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * An abstract base class for implementing a {@link ServiceController} using ZooKeeper as a means for
- * communicating with the remote service. This is designed to work in pair with the {@link ZKServiceDecorator}.
- */
-public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractZKServiceController.class);
-
- private final ZKClient zkClient;
- private final InstanceNodeDataCallback instanceNodeDataCallback;
- private final StateNodeDataCallback stateNodeDataCallback;
- private final List<ListenableFuture<?>> messageFutures;
- private ListenableFuture<State> stopMessageFuture;
-
- protected AbstractZKServiceController(RunId runId, ZKClient zkClient) {
- super(runId);
- this.zkClient = zkClient;
- this.instanceNodeDataCallback = new InstanceNodeDataCallback();
- this.stateNodeDataCallback = new StateNodeDataCallback();
- this.messageFutures = Lists.newLinkedList();
- }
-
- @Override
- public final ListenableFuture<Command> sendCommand(Command command) {
- return sendMessage(Messages.createForAll(command), command);
- }
-
- @Override
- public final ListenableFuture<Command> sendCommand(String runnableName, Command command) {
- return sendMessage(Messages.createForRunnable(runnableName, command), command);
- }
-
- @Override
- protected final void startUp() {
- // Watch for instance node existence.
- actOnExists(getInstancePath(), new Runnable() {
- @Override
- public void run() {
- watchInstanceNode();
- }
- });
-
- // Watch for state node data
- actOnExists(getZKPath("state"), new Runnable() {
- @Override
- public void run() {
- watchStateNode();
- }
- });
-
- doStartUp();
- }
-
- @Override
- protected final synchronized void shutDown() {
- if (stopMessageFuture == null) {
- stopMessageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(),
- SystemMessages.stopApplication(), State.TERMINATED);
- }
-
- // Cancel all pending message futures.
- for (ListenableFuture<?> future : messageFutures) {
- future.cancel(true);
- }
-
- doShutDown();
- }
-
- /**
- * Sends a {@link Message} to the remote service. Returns a future that will be completed when the message
- * has been processed.
- * @param message The message to send.
- * @param result Object to set into the future when message is being processed.
- * @param <V> Type of the result.
- * @return A {@link ListenableFuture} that will be completed when the message has been processed.
- */
- protected final synchronized <V> ListenableFuture<V> sendMessage(Message message, V result) {
- if (!isRunning()) {
- return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
- }
- final ListenableFuture<V> messageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(), message, result);
- messageFutures.add(messageFuture);
- messageFuture.addListener(new Runnable() {
- @Override
- public void run() {
- // If the completion is triggered when stopping, do nothing.
- if (state() == State.STOPPING) {
- return;
- }
- synchronized (AbstractZKServiceController.this) {
- messageFutures.remove(messageFuture);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- return messageFuture;
- }
-
- protected final ListenableFuture<State> getStopMessageFuture() {
- return stopMessageFuture;
- }
-
- /**
- * Called during startup. Executed in the startup thread.
- */
- protected abstract void doStartUp();
-
- /**
- * Called during shutdown. Executed in the shutdown thread.
- */
- protected abstract void doShutDown();
-
- /**
- * Called when an update on the live instance node is detected.
- * @param nodeData The updated live instance node data or {@code null} if there is an error when fetching
- * the node data.
- */
- protected abstract void instanceNodeUpdated(NodeData nodeData);
-
- /**
- * Called when an update on the state node is detected.
- * @param stateNode The update state node data or {@code null} if there is an error when fetching the node data.
- */
- protected abstract void stateNodeUpdated(StateNode stateNode);
-
- protected synchronized void forceShutDown() {
- if (stopMessageFuture == null) {
- // In force shutdown, don't send message.
- stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
- }
- stop();
- }
-
-
- private void actOnExists(final String path, final Runnable action) {
- // Watch for node existence.
- final AtomicBoolean nodeExists = new AtomicBoolean(false);
- Futures.addCallback(zkClient.exists(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // When node is created, call the action.
- // Other event type would be handled by the action.
- if (event.getType() == Event.EventType.NodeCreated && nodeExists.compareAndSet(false, true)) {
- action.run();
- }
- }
- }), new FutureCallback<Stat>() {
- @Override
- public void onSuccess(Stat result) {
- if (result != null && nodeExists.compareAndSet(false, true)) {
- action.run();
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Failed in exists call to {}. Shutting down service.", path, t);
- forceShutDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- private void watchInstanceNode() {
- Futures.addCallback(zkClient.getData(getInstancePath(), new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- State state = state();
- if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
- // Ignore ZK node events when it is in stopping sequence.
- return;
- }
- switch (event.getType()) {
- case NodeDataChanged:
- watchInstanceNode();
- break;
- case NodeDeleted:
- // When the ephemeral node goes away, treat the remote service stopped.
- forceShutDown();
- break;
- default:
- LOG.info("Ignore ZK event for instance node: {}", event);
- }
- }
- }), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
- }
-
- private void watchStateNode() {
- Futures.addCallback(zkClient.getData(getZKPath("state"), new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- State state = state();
- if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
- // Ignore ZK node events when it is in stopping sequence.
- return;
- }
- switch (event.getType()) {
- case NodeDataChanged:
- watchStateNode();
- break;
- default:
- LOG.info("Ignore ZK event for state node: {}", event);
- }
- }
- }), stateNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
- }
-
- /**
- * Returns the path prefix for creating sequential message node for the remote service.
- */
- private String getMessagePrefix() {
- return getZKPath("messages/msg");
- }
-
- /**
- * Returns the zookeeper node path for the ephemeral instance node for this runId.
- */
- private String getInstancePath() {
- return String.format("/instances/%s", getRunId().getId());
- }
-
- private String getZKPath(String path) {
- return String.format("/%s/%s", getRunId().getId(), path);
- }
-
- private final class InstanceNodeDataCallback implements FutureCallback<NodeData> {
-
- @Override
- public void onSuccess(NodeData result) {
- instanceNodeUpdated(result);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Failed in fetching instance node data.", t);
- if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
- // If the node is gone, treat the remote service stopped.
- forceShutDown();
- } else {
- instanceNodeUpdated(null);
- }
- }
- }
-
- private final class StateNodeDataCallback implements FutureCallback<NodeData> {
-
- @Override
- public void onSuccess(NodeData result) {
- byte[] data = result.getData();
- if (data == null) {
- stateNodeUpdated(null);
- return;
- }
- StateNode stateNode = new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
- .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
- .create()
- .fromJson(new String(data, Charsets.UTF_8), StateNode.class);
-
- stateNodeUpdated(stateNode);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Failed in fetching state node data.", t);
- stateNodeUpdated(null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
deleted file mode 100644
index a0e9a71..0000000
--- a/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.utils.Dependencies;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedOutputStream;
-
-/**
- * This class builds jar files based on class dependencies.
- */
-public final class ApplicationBundler {
-
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class);
-
- public static final String SUBDIR_CLASSES = "classes/";
- public static final String SUBDIR_LIB = "lib/";
- public static final String SUBDIR_RESOURCES = "resources/";
-
- private final List<String> excludePackages;
- private final List<String> includePackages;
- private final Set<String> bootstrapClassPaths;
- private final CRC32 crc32;
-
- /**
- * Constructs a ApplicationBundler.
- *
- * @param excludePackages Class packages to exclude
- */
- public ApplicationBundler(Iterable<String> excludePackages) {
- this(excludePackages, ImmutableList.<String>of());
- }
-
- /**
- * Constructs a ApplicationBundler.
- *
- * @param excludePackages Class packages to exclude
- * @param includePackages Class packages that should be included. Anything in this list will override the
- * one provided in excludePackages.
- */
- public ApplicationBundler(Iterable<String> excludePackages, Iterable<String> includePackages) {
- this.excludePackages = ImmutableList.copyOf(excludePackages);
- this.includePackages = ImmutableList.copyOf(includePackages);
-
- ImmutableSet.Builder<String> builder = ImmutableSet.builder();
- for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) {
- File file = new File(classpath);
- builder.add(file.getAbsolutePath());
- try {
- builder.add(file.getCanonicalPath());
- } catch (IOException e) {
- // Ignore the exception and proceed.
- }
- }
- this.bootstrapClassPaths = builder.build();
- this.crc32 = new CRC32();
-
- }
-
- public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
- createBundle(target, classes, ImmutableList.<URI>of());
- }
-
- /**
- * Same as calling {@link #createBundle(Location, Iterable)}.
- */
- public void createBundle(Location target, Class<?> clz, Class<?>...classes) throws IOException {
- createBundle(target, ImmutableSet.<Class<?>>builder().add(clz).add(classes).build());
- }
-
- /**
- * Creates a jar file which includes all the given classes and all the classes that they depended on.
- * The jar will also include all classes and resources under the packages as given as include packages
- * in the constructor.
- *
- * @param target Where to save the target jar file.
- * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
- * lib/ entry, otherwise under the resources/ entry.
- * @param classes Set of classes to start the dependency traversal.
- * @throws IOException
- */
- public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
- LOG.debug("start creating bundle {}. building a temporary file locally at first", target.getName());
- // Write the jar to local tmp file first
- File tmpJar = File.createTempFile(target.getName(), ".tmp");
- try {
- Set<String> entries = Sets.newHashSet();
- JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(tmpJar));
- try {
- // Find class dependencies
- findDependencies(classes, entries, jarOut);
-
- // Add extra resources
- for (URI resource : resources) {
- copyResource(resource, entries, jarOut);
- }
- } finally {
- jarOut.close();
- }
- LOG.debug("copying temporary bundle to destination {} ({} bytes)", target.toURI(), tmpJar.length());
- // Copy the tmp jar into destination.
- OutputStream os = null;
- try {
- os = new BufferedOutputStream(target.getOutputStream());
- Files.copy(tmpJar, os);
- } catch (IOException e) {
- throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target.toURI(), e);
- } finally {
- if (os != null) {
- os.close();
- }
- }
- LOG.debug("finished creating bundle at {}", target.toURI());
- } finally {
- tmpJar.delete();
- LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());
- }
- }
-
- private void findDependencies(Iterable<Class<?>> classes, final Set<String> entries,
- final JarOutputStream jarOut) throws IOException {
-
- Iterable<String> classNames = Iterables.transform(classes, new Function<Class<?>, String>() {
- @Override
- public String apply(Class<?> input) {
- return input.getName();
- }
- });
-
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- if (classLoader == null) {
- classLoader = getClass().getClassLoader();
- }
- Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
- @Override
- public boolean accept(String className, URL classUrl, URL classPathUrl) {
- if (bootstrapClassPaths.contains(classPathUrl.getFile())) {
- return false;
- }
-
- boolean shouldInclude = false;
- for (String include : includePackages) {
- if (className.startsWith(include)) {
- shouldInclude = true;
- break;
- }
- }
-
- if (!shouldInclude) {
- for (String exclude : excludePackages) {
- if (className.startsWith(exclude)) {
- return false;
- }
- }
- }
-
- putEntry(className, classUrl, classPathUrl, entries, jarOut);
- return true;
- }
- }, classNames);
- }
-
- private void putEntry(String className, URL classUrl, URL classPathUrl, Set<String> entries, JarOutputStream jarOut) {
- String classPath = classPathUrl.getFile();
- if (classPath.endsWith(".jar")) {
- saveDirEntry(SUBDIR_LIB, entries, jarOut);
- saveEntry(SUBDIR_LIB + classPath.substring(classPath.lastIndexOf('/') + 1), classPathUrl, entries, jarOut, false);
- } else {
- // Class file, put it under the classes directory
- saveDirEntry(SUBDIR_CLASSES, entries, jarOut);
- if ("file".equals(classPathUrl.getProtocol())) {
- // Copy every files under the classPath
- try {
- copyDir(new File(classPathUrl.toURI()), SUBDIR_CLASSES, entries, jarOut);
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- } else {
- String entry = SUBDIR_CLASSES + className.replace('.', '/') + ".class";
- saveDirEntry(entry.substring(0, entry.lastIndexOf('/') + 1), entries, jarOut);
- saveEntry(entry, classUrl, entries, jarOut, true);
- }
- }
- }
-
- /**
- * Saves a directory entry to the jar output.
- */
- private void saveDirEntry(String path, Set<String> entries, JarOutputStream jarOut) {
- if (entries.contains(path)) {
- return;
- }
-
- try {
- String entry = "";
- for (String dir : Splitter.on('/').omitEmptyStrings().split(path)) {
- entry += dir + '/';
- if (entries.add(entry)) {
- JarEntry jarEntry = new JarEntry(entry);
- jarEntry.setMethod(JarOutputStream.STORED);
- jarEntry.setSize(0L);
- jarEntry.setCrc(0L);
- jarOut.putNextEntry(jarEntry);
- jarOut.closeEntry();
- }
- }
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- /**
- * Saves a class entry to the jar output.
- */
- private void saveEntry(String entry, URL url, Set<String> entries, JarOutputStream jarOut, boolean compress) {
- LOG.debug("adding bundle entry " + entry);
- if (!entries.add(entry)) {
- return;
- }
- try {
- JarEntry jarEntry = new JarEntry(entry);
- InputStream is = url.openStream();
-
- try {
- if (compress) {
- jarOut.putNextEntry(jarEntry);
- ByteStreams.copy(is, jarOut);
- } else {
- crc32.reset();
- TransferByteOutputStream os = new TransferByteOutputStream();
- CheckedOutputStream checkedOut = new CheckedOutputStream(os, crc32);
- ByteStreams.copy(is, checkedOut);
- checkedOut.close();
-
- long size = os.size();
- jarEntry.setMethod(JarEntry.STORED);
- jarEntry.setSize(size);
- jarEntry.setCrc(checkedOut.getChecksum().getValue());
- jarOut.putNextEntry(jarEntry);
- os.transfer(jarOut);
- }
- } finally {
- is.close();
- }
- jarOut.closeEntry();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
-
- /**
- * Copies all entries under the file path.
- */
- private void copyDir(File baseDir, String entryPrefix,
- Set<String> entries, JarOutputStream jarOut) throws IOException {
- LOG.debug("adding whole dir {} to bundle at '{}'", baseDir, entryPrefix);
- URI baseUri = baseDir.toURI();
- Queue<File> queue = Lists.newLinkedList();
- Collections.addAll(queue, baseDir.listFiles());
- while (!queue.isEmpty()) {
- File file = queue.remove();
-
- String entry = entryPrefix + baseUri.relativize(file.toURI()).getPath();
- if (entries.add(entry)) {
- jarOut.putNextEntry(new JarEntry(entry));
- if (file.isFile()) {
- try {
- Files.copy(file, jarOut);
- } catch (IOException e) {
- throw new IOException("failure copying from " + file.getAbsoluteFile() + " to JAR file entry " + entry, e);
- }
- }
- jarOut.closeEntry();
- }
-
- if (file.isDirectory()) {
- File[] files = file.listFiles();
- if (files != null) {
- queue.addAll(Arrays.asList(files));
- }
- }
- }
- }
-
- private void copyResource(URI resource, Set<String> entries, JarOutputStream jarOut) throws IOException {
- if ("file".equals(resource.getScheme())) {
- File file = new File(resource);
- if (file.isDirectory()) {
- saveDirEntry(SUBDIR_RESOURCES, entries, jarOut);
- copyDir(file, SUBDIR_RESOURCES, entries, jarOut);
- return;
- }
- }
-
- URL url = resource.toURL();
- String path = url.getFile();
- String prefix = path.endsWith(".jar") ? SUBDIR_LIB : SUBDIR_RESOURCES;
- path = prefix + path.substring(path.lastIndexOf('/') + 1);
-
- saveDirEntry(prefix, entries, jarOut);
- jarOut.putNextEntry(new JarEntry(path));
- InputStream is = url.openStream();
- try {
- ByteStreams.copy(is, jarOut);
- } finally {
- is.close();
- }
- }
-
- private static final class TransferByteOutputStream extends ByteArrayOutputStream {
-
- public void transfer(OutputStream os) throws IOException {
- os.write(buf, 0, count);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/Arguments.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/Arguments.java b/core/src/main/java/org/apache/twill/internal/Arguments.java
deleted file mode 100644
index a78547c..0000000
--- a/core/src/main/java/org/apache/twill/internal/Arguments.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-
-import java.util.List;
-
-/**
- * Class that encapsulate application arguments and per runnable arguments.
- */
-public final class Arguments {
-
- private final List<String> arguments;
- private final Multimap<String, String> runnableArguments;
-
- public Arguments(List<String> arguments, Multimap<String, String> runnableArguments) {
- this.arguments = ImmutableList.copyOf(arguments);
- this.runnableArguments = ImmutableMultimap.copyOf(runnableArguments);
- }
-
- public List<String> getArguments() {
- return arguments;
- }
-
- public Multimap<String, String> getRunnableArguments() {
- return runnableArguments;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
deleted file mode 100644
index 61bdaef..0000000
--- a/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryService;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- *
- */
-public final class BasicTwillContext implements TwillContext {
-
- private final RunId runId;
- private final RunId appRunId;
- private final InetAddress host;
- private final String[] args;
- private final String[] appArgs;
- private final TwillRunnableSpecification spec;
- private final int instanceId;
- private final DiscoveryService discoveryService;
- private final int allowedMemoryMB;
- private final int virtualCores;
- private volatile int instanceCount;
-
- public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
- TwillRunnableSpecification spec, int instanceId, DiscoveryService discoveryService,
- int instanceCount, int allowedMemoryMB, int virtualCores) {
- this.runId = runId;
- this.appRunId = appRunId;
- this.host = host;
- this.args = args;
- this.appArgs = appArgs;
- this.spec = spec;
- this.instanceId = instanceId;
- this.discoveryService = discoveryService;
- this.instanceCount = instanceCount;
- this.allowedMemoryMB = allowedMemoryMB;
- this.virtualCores = virtualCores;
- }
-
- @Override
- public RunId getRunId() {
- return runId;
- }
-
- @Override
- public RunId getApplicationRunId() {
- return appRunId;
- }
-
- @Override
- public int getInstanceCount() {
- return instanceCount;
- }
-
- public void setInstanceCount(int count) {
- this.instanceCount = count;
- }
-
- @Override
- public InetAddress getHost() {
- return host;
- }
-
- @Override
- public String[] getArguments() {
- return args;
- }
-
- @Override
- public String[] getApplicationArguments() {
- return appArgs;
- }
-
- @Override
- public TwillRunnableSpecification getSpecification() {
- return spec;
- }
-
- @Override
- public int getInstanceId() {
- return instanceId;
- }
-
- @Override
- public int getVirtualCores() {
- return virtualCores;
- }
-
- @Override
- public int getMaxMemoryMB() {
- return allowedMemoryMB;
- }
-
- @Override
- public Cancellable announce(final String serviceName, final int port) {
- return discoveryService.register(new Discoverable() {
- @Override
- public String getName() {
- return serviceName;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(getHost(), port);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/Configs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/Configs.java b/core/src/main/java/org/apache/twill/internal/Configs.java
deleted file mode 100644
index 0fa1df8..0000000
--- a/core/src/main/java/org/apache/twill/internal/Configs.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- *
- */
-public final class Configs {
-
- public static final class Keys {
- /**
- * Size in MB of reserved memory for Java process (non-heap memory).
- */
- public static final String JAVA_RESERVED_MEMORY_MB = "twill.java.reserved.memory.mb";
-
- private Keys() {
- }
- }
-
- public static final class Defaults {
- // By default have 200MB reserved for Java process.
- public static final int JAVA_RESERVED_MEMORY_MB = 200;
-
- private Defaults() {
- }
- }
-
- private Configs() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/Constants.java b/core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 0387d3e..0000000
--- a/core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
- public static final String LOG_TOPIC = "log";
-
- /** Maximum number of seconds for AM to start. */
- public static final int APPLICATION_MAX_START_SECONDS = 60;
- /** Maximum number of seconds for AM to stop. */
- public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
- public static final long PROVISION_TIMEOUT = 30000;
-
- /** Memory size of AM */
- public static final int APP_MASTER_MEMORY_MB = 512;
-
- public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
- public static final String STDOUT = "stdout";
- public static final String STDERR = "stderr";
-
- /**
- * Constants for names of internal files that are shared between client, AM and containers.
- */
- public static final class Files {
-
- public static final String LAUNCHER_JAR = "launcher.jar";
- public static final String APP_MASTER_JAR = "appMaster.jar";
- public static final String CONTAINER_JAR = "container.jar";
- public static final String LOCALIZE_FILES = "localizeFiles.json";
- public static final String TWILL_SPEC = "twillSpec.json";
- public static final String ARGUMENTS = "arguments.json";
- public static final String LOGBACK_TEMPLATE = "logback-template.xml";
- public static final String KAFKA = "kafka.tgz";
- public static final String JVM_OPTIONS = "jvm.opts";
- public static final String CREDENTIALS = "credentials.store";
-
- private Files() {
- }
- }
-
- private Constants() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/core/src/main/java/org/apache/twill/internal/ContainerInfo.java
deleted file mode 100644
index 67c21d3..0000000
--- a/core/src/main/java/org/apache/twill/internal/ContainerInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import java.net.InetAddress;
-
-/**
- * Represents information of the container that the processing is/will be running in.
- */
-public interface ContainerInfo {
-
- String getId();
-
- InetAddress getHost();
-
- int getPort();
-
- int getMemoryMB();
-
- int getVirtualCores();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java b/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
deleted file mode 100644
index 705943c..0000000
--- a/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- *
- */
-public final class ContainerLiveNodeData {
-
- private final String containerId;
- private final String host;
-
- public ContainerLiveNodeData(String containerId, String host) {
- this.containerId = containerId;
- this.host = host;
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public String getHost() {
- return host;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java b/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
deleted file mode 100644
index fd50028..0000000
--- a/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * A {@link ContainerInfo} based on information on the environment.
- */
-public final class EnvContainerInfo implements ContainerInfo {
- private final String id;
- private final InetAddress host;
- private final int port;
- private final int virtualCores;
- private final int memoryMB;
-
- public EnvContainerInfo() throws UnknownHostException {
- id = System.getenv(EnvKeys.YARN_CONTAINER_ID);
- host = InetAddress.getByName(System.getenv(EnvKeys.YARN_CONTAINER_HOST));
- port = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_PORT));
- virtualCores = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES));
- memoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
- }
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public InetAddress getHost() {
- return host;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public int getMemoryMB() {
- return memoryMB;
- }
-
- @Override
- public int getVirtualCores() {
- return virtualCores;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/EnvKeys.java b/core/src/main/java/org/apache/twill/internal/EnvKeys.java
deleted file mode 100644
index 9bf6523..0000000
--- a/core/src/main/java/org/apache/twill/internal/EnvKeys.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * Places for define common environment keys.
- */
-public final class EnvKeys {
-
- public static final String TWILL_ZK_CONNECT = "TWILL_ZK_CONNECT";
- public static final String TWILL_APP_RUN_ID = "TWILL_APP_RUN_ID";
- public static final String TWILL_RUN_ID = "TWILL_RUN_ID";
- public static final String TWILL_INSTANCE_ID = "TWILL_INSTANCE_ID";
- public static final String TWILL_INSTANCE_COUNT = "TWILL_INSTANCE_COUNT";
- public static final String TWILL_RESERVED_MEMORY_MB = "TWILL_RESERVED_MEMORY_MB";
-
- public static final String TWILL_FS_USER = "TWILL_FS_USER";
-
- /**
- * Cluster filesystem directory for storing twill app related files.
- */
- public static final String TWILL_APP_DIR = "TWILL_APP_DIR";
-
- public static final String TWILL_APP_NAME = "TWILL_APP_NAME";
- public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
-
- public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
-
- public static final String YARN_APP_ID = "YARN_APP_ID";
- public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME";
- public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";
-
- public static final String YARN_CONTAINER_ID = "YARN_CONTAINER_ID";
- public static final String YARN_CONTAINER_HOST = "YARN_CONTAINER_HOST";
- public static final String YARN_CONTAINER_PORT = "YARN_CONTAINER_PORT";
- /**
- * Used to inform runnables of their resource usage.
- */
- public static final String YARN_CONTAINER_VIRTUAL_CORES = "YARN_CONTAINER_VIRTUAL_CORES";
- public static final String YARN_CONTAINER_MEMORY_MB = "YARN_CONTAINER_MEMORY_MB";
-
- private EnvKeys() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
deleted file mode 100644
index 9d3e156..0000000
--- a/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-
-/**
- * Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
- * Also make sure each method is called at most once.
- */
-final class ListenerExecutor implements Service.Listener {
-
- private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);
-
- private final Service.Listener delegate;
- private final Executor executor;
- private final ConcurrentMap<Service.State, Boolean> callStates = Maps.newConcurrentMap();
-
- ListenerExecutor(Service.Listener delegate, Executor executor) {
- this.delegate = delegate;
- this.executor = executor;
- }
-
- @Override
- public void starting() {
- if (hasCalled(Service.State.STARTING)) {
- return;
- }
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- delegate.starting();
- } catch (Throwable t) {
- LOG.warn("Exception thrown from listener", t);
- }
- }
- });
- }
-
- @Override
- public void running() {
- if (hasCalled(Service.State.RUNNING)) {
- return;
- }
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- delegate.running();
- } catch (Throwable t) {
- LOG.warn("Exception thrown from listener", t);
- }
- }
- });
- }
-
- @Override
- public void stopping(final Service.State from) {
- if (hasCalled(Service.State.STOPPING)) {
- return;
- }
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- delegate.stopping(from);
- } catch (Throwable t) {
- LOG.warn("Exception thrown from listener", t);
- }
- }
- });
- }
-
- @Override
- public void terminated(final Service.State from) {
- if (hasCalled(Service.State.TERMINATED)) {
- return;
- }
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- delegate.terminated(from);
- } catch (Throwable t) {
- LOG.warn("Exception thrown from listener", t);
- }
- }
- });
- }
-
- @Override
- public void failed(final Service.State from, final Throwable failure) {
- // Both failed and terminate are using the same state for checking as only either one could be called.
- if (hasCalled(Service.State.TERMINATED)) {
- return;
- }
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- delegate.failed(from, failure);
- } catch (Throwable t) {
- LOG.warn("Exception thrown from listener", t);
- }
- }
- });
- }
-
- private boolean hasCalled(Service.State state) {
- return callStates.putIfAbsent(state, true) != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java b/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
deleted file mode 100644
index 4f71a05..0000000
--- a/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.EventHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public final class LogOnlyEventHandler extends EventHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(LogOnlyEventHandler.class);
-
- @Override
- public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
- for (TimeoutEvent event : timeoutEvents) {
- LOG.info("Requested {} containers for runnable {}, only got {} after {} ms.",
- event.getExpectedInstances(), event.getRunnableName(),
- event.getActualInstances(), System.currentTimeMillis() - event.getRequestTime());
- }
- return TimeoutAction.recheck(Constants.PROVISION_TIMEOUT, TimeUnit.MILLISECONDS);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ProcessController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ProcessController.java b/core/src/main/java/org/apache/twill/internal/ProcessController.java
deleted file mode 100644
index 4453838..0000000
--- a/core/src/main/java/org/apache/twill/internal/ProcessController.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.common.Cancellable;
-
-/**
- * For controlling a launch yarn process.
- *
- * @param <R> Report type.
- */
-public interface ProcessController<R> extends Cancellable {
-
- R getReport();
-
- /**
- * Request to stop the running process.
- */
- void cancel();
-}