You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/01/26 20:16:19 UTC

[01/22] incubator-twill git commit: Add hsaputra for Henry Saputra as entry in the master pom, as well as adding for Albert and Poorna.

Repository: incubator-twill
Updated Branches:
  refs/heads/site 81a5e462b -> 87e2b4652


Add hsaputra for Henry Saputra as entry in the master pom, as well as adding for Albert and Poorna.

This closes #61 from GitHub.

Signed-off-by: hsaputra <hs...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/d2d90817
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/d2d90817
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/d2d90817

Branch: refs/heads/site
Commit: d2d908172cd7fe3f37ce440a5f7ffc01b43625a6
Parents: a303e3b
Author: hsaputra <hs...@apache.org>
Authored: Wed Aug 5 12:04:11 2015 -0700
Committer: hsaputra <hs...@apache.org>
Committed: Thu Aug 6 16:53:58 2015 -0700

----------------------------------------------------------------------
 pom.xml | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d2d90817/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7685b58..4daa68d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
             </roles>
         </developer>
         <developer>
+            <id>ashau</id>
             <name>Albert Shau</name>
             <email>ashau@apache.org</email>
             <roles>
@@ -121,12 +122,21 @@
             </roles>
         </developer>
         <developer>
+            <id>poorna</id>
             <name>Poorna Chandra</name>
             <email>poorna@apache.org</email>
             <roles>
                 <role>Committer</role>
             </roles>
         </developer>
+        <developer>
+            <id>hsaputra</id>
+            <name>Henry Saputra</name>
+            <email>hsaputra@apache.org</email>
+            <roles>
+                <role>Committer</role>
+            </roles>
+        </developer>
     </developers>
 
     <modules>


[14/22] incubator-twill git commit: (TWILL-158) Added FileContext Location and LocationFactory

Posted by ch...@apache.org.
(TWILL-158) Added FileContext Location and LocationFactory

- Added new unit-tests.
- Minor improvement on HDFSLocationFactory to handle creation URI correctly

This closes #73 on GitHub.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/388a6d92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/388a6d92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/388a6d92

Branch: refs/heads/site
Commit: 388a6d922dd89dd8e1f5a9fed5aefe265ef2eeed
Parents: b80f9b8
Author: Terence Yim <ch...@apache.org>
Authored: Thu Dec 24 12:38:03 2015 +0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 5 14:54:24 2016 -0800

----------------------------------------------------------------------
 .../twill/filesystem/LocalLocationFactory.java  |   4 +-
 .../twill/filesystem/FileContextLocation.java   | 219 +++++++++++++++++++
 .../filesystem/FileContextLocationFactory.java  | 119 ++++++++++
 .../apache/twill/filesystem/HDFSLocation.java   |   2 +-
 .../twill/filesystem/HDFSLocationFactory.java   |  18 +-
 .../filesystem/FileContextLocationTest.java     |  50 +++++
 .../twill/filesystem/HDFSLocationTest.java      |   6 +-
 .../twill/filesystem/LocalLocationTest.java     |  17 +-
 .../twill/filesystem/LocationTestBase.java      |  68 +++++-
 9 files changed, 477 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
index 82847b2..8e7ab8b 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
@@ -31,7 +31,7 @@ public final class LocalLocationFactory implements LocationFactory {
    * Constructs a LocalLocationFactory that Location created will be relative to system root.
    */
   public LocalLocationFactory() {
-    this(new File("/"));
+    this(new File(File.separator));
   }
 
   public LocalLocationFactory(File basePath) {
@@ -40,7 +40,7 @@ public final class LocalLocationFactory implements LocationFactory {
 
   @Override
   public Location create(String path) {
-    return new LocalLocation(this, new File(basePath, path));
+    return create(new File(basePath, path).toURI());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
new file mode 100644
index 0000000..f92954e
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HAUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link Location} using {@link FileContext}.
+ */
+final class FileContextLocation implements Location {
+
+  private final FileContextLocationFactory locationFactory;
+  private final FileContext fc;
+  private final Path path;
+
+  FileContextLocation(FileContextLocationFactory locationFactory, FileContext fc, Path path) {
+    this.locationFactory = locationFactory;
+    this.fc = fc;
+    this.path = path;
+  }
+
+  @Override
+  public boolean exists() throws IOException {
+    return fc.util().exists(path);
+  }
+
+  @Override
+  public String getName() {
+    return path.getName();
+  }
+
+  @Override
+  public boolean createNew() throws IOException {
+    try {
+      fc.create(path, EnumSet.of(CreateFlag.CREATE), Options.CreateOpts.createParent()).close();
+      return true;
+    } catch (FileAlreadyExistsException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return fc.open(path);
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.createParent());
+  }
+
+  @Override
+  public OutputStream getOutputStream(String permission) throws IOException {
+    return fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+                     Options.CreateOpts.perms(new FsPermission(permission)),
+                     Options.CreateOpts.createParent());
+  }
+
+  @Override
+  public Location append(String child) throws IOException {
+    if (child.startsWith("/")) {
+      child = child.substring(1);
+    }
+    return new FileContextLocation(locationFactory, fc, new Path(URI.create(path.toUri() + "/" + child)));
+  }
+
+  @Override
+  public Location getTempFile(String suffix) throws IOException {
+    Path path = new Path(
+      URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
+    return new FileContextLocation(locationFactory, fc, path);
+  }
+
+  @Override
+  public URI toURI() {
+    // In HA mode, the path URI returned by path created through FileContext is incompatible with the FileSystem,
+    // which is used inside Hadoop. It is due to the fact that FileContext is not HA aware and it always
+    // append "port" to the path URI, while the DistributedFileSystem always use the cluster logical
+    // name, which doesn't allow having port in it.
+    URI uri = path.toUri();
+    if (HAUtil.isLogicalUri(locationFactory.getConfiguration(), uri)) {
+      try {
+        // Need to strip out the port if in HA
+        return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
+                       -1, uri.getPath(), uri.getQuery(), uri.getFragment());
+      } catch (URISyntaxException e) {
+        // Shouldn't happen
+        throw Throwables.propagate(e);
+      }
+    }
+
+    return uri;
+  }
+
+  @Override
+  public boolean delete() throws IOException {
+    return delete(false);
+  }
+
+  @Override
+  public boolean delete(boolean recursive) throws IOException {
+    return fc.delete(path, recursive);
+  }
+
+  @Nullable
+  @Override
+  public Location renameTo(Location destination) throws IOException {
+    Path targetPath = new Path(destination.toURI());
+    try {
+      fc.rename(path, targetPath, Options.Rename.OVERWRITE);
+      return new FileContextLocation(locationFactory, fc, targetPath);
+    } catch (FileAlreadyExistsException | FileNotFoundException | ParentNotDirectoryException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean mkdirs() throws IOException {
+    try {
+      fc.mkdir(path, null, true);
+      return true;
+    } catch (FileAlreadyExistsException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public long length() throws IOException {
+    return fc.getFileStatus(path).getLen();
+  }
+
+  @Override
+  public long lastModified() throws IOException {
+    return fc.getFileStatus(path).getModificationTime();
+  }
+
+  @Override
+  public boolean isDirectory() throws IOException {
+    try {
+      return fc.getFileStatus(path).isDirectory();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public List<Location> list() throws IOException {
+    RemoteIterator<FileStatus> statuses = fc.listStatus(path);
+    ImmutableList.Builder<Location> result = ImmutableList.builder();
+    while (statuses.hasNext()) {
+      FileStatus status = statuses.next();
+      if (!Objects.equals(path, status.getPath())) {
+        result.add(new FileContextLocation(locationFactory, fc, status.getPath()));
+      }
+    }
+    return result.build();
+
+  }
+
+  @Override
+  public LocationFactory getLocationFactory() {
+    return locationFactory;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FileContextLocation that = (FileContextLocation) o;
+    return Objects.equals(path, that.path);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java
new file mode 100644
index 0000000..d64be71
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import java.net.URI;
+import java.util.Objects;
+
+/**
+ * A {@link LocationFactory} implementation that uses {@link FileContext} to create {@link Location}.
+ */
+public class FileContextLocationFactory implements LocationFactory {
+
+  private final Configuration configuration;
+  private final FileContext fc;
+  private final Path pathBase;
+
+  /**
+   * Same as {@link #FileContextLocationFactory(Configuration, String) FileContextLocationFactory(configuration, "/")}.
+   */
+  public FileContextLocationFactory(Configuration configuration) {
+    this(configuration, "/");
+  }
+
+  /**
+   * Creates a new instance.
+   *
+   * @param configuration the hadoop configuration
+   * @param pathBase base path for all non-absolute location created through this {@link LocationFactory}.
+   */
+  public FileContextLocationFactory(Configuration configuration, String pathBase) {
+    this.configuration = configuration;
+    this.fc = createFileContext(configuration);
+    this.pathBase = new Path(pathBase.startsWith("/") ? pathBase : "/" + pathBase);
+  }
+
+  @Override
+  public Location create(String path) {
+    if (path.startsWith("/")) {
+      path = path.substring(1);
+    }
+    Path locationPath;
+    if (path.isEmpty()) {
+      locationPath = pathBase;
+    } else {
+      locationPath = new Path(path);
+    }
+    locationPath = locationPath.makeQualified(fc.getDefaultFileSystem().getUri(), pathBase);
+    return new FileContextLocation(this, fc, locationPath);
+  }
+
+  @Override
+  public Location create(URI uri) {
+    URI contextURI = fc.getWorkingDirectory().toUri();
+    if (Objects.equals(contextURI.getScheme(), uri.getScheme())
+      && Objects.equals(contextURI.getAuthority(), uri.getAuthority())) {
+      // A full URI
+      return new FileContextLocation(this, fc, new Path(uri));
+    }
+
+    if (uri.isAbsolute()) {
+      // Needs to be of the same scheme
+      Preconditions.checkArgument(Objects.equals(contextURI.getScheme(), uri.getScheme()),
+                                  "Only URI with '%s' scheme is supported", contextURI.getScheme());
+      Path locationPath = new Path(uri).makeQualified(fc.getDefaultFileSystem().getUri(), pathBase);
+      return new FileContextLocation(this, fc, locationPath);
+    }
+
+    return create(uri.getPath());
+  }
+
+  @Override
+  public Location getHomeLocation() {
+    return new FileContextLocation(this, fc, fc.getHomeDirectory());
+  }
+
+  /**
+   * Returns the {@link FileContext} used by this {@link LocationFactory}.
+   */
+  public FileContext getFileContext() {
+    return fc;
+  }
+
+  /**
+   * Returns the {@link Configuration} used by this {@link LocationFactory}.
+   */
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  private static FileContext createFileContext(Configuration configuration) {
+    try {
+      return FileContext.getFileContext(configuration);
+    } catch (UnsupportedFileSystemException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index 818fe23..aa29384 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -35,7 +35,7 @@ import java.util.List;
 import java.util.UUID;
 
 /**
- * A concrete implementation of {@link Location} for the HDFS filesystem.
+ * A concrete implementation of {@link Location} for the HDFS filesystem using {@link FileSystem}.
  */
 final class HDFSLocation implements Location {
   private final FileSystem fs;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
index 65146a8..728de32 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.filesystem;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -24,10 +25,14 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Objects;
 
 /**
- * A {@link LocationFactory} that creates HDFS {@link Location}.
+ * A {@link LocationFactory} that creates HDFS {@link Location} using {@link FileSystem}.
+ *
+ * @deprecated Deprecated since 0.7.0. Use {@link FileContextLocationFactory} instead.
  */
+@Deprecated
 public final class HDFSLocationFactory implements LocationFactory {
 
   private final FileSystem fileSystem;
@@ -63,14 +68,21 @@ public final class HDFSLocationFactory implements LocationFactory {
 
   @Override
   public Location create(URI uri) {
-    if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
+    URI fsURI = fileSystem.getUri();
+    if (Objects.equals(fsURI.getScheme(), uri.getScheme())
+      && Objects.equals(fsURI.getAuthority(), uri.getAuthority())) {
       // It's a full URI
       return new HDFSLocation(this, new Path(uri));
     }
+
     if (uri.isAbsolute()) {
+      // Needs to be of the same scheme
+      Preconditions.checkArgument(Objects.equals(fsURI.getScheme(), uri.getScheme()),
+                                  "Only URI with '%s' scheme is supported", fsURI.getScheme());
       return new HDFSLocation(this, new Path(fileSystem.getUri() + uri.getPath()));
     }
-    return new HDFSLocation(this, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
+
+    return create(uri.getPath());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
new file mode 100644
index 0000000..e4c3774
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class FileContextLocationTest extends LocationTestBase {
+
+  private static MiniDFSCluster dfsCluster;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+  }
+
+  @AfterClass
+  public static void finish() {
+    dfsCluster.shutdown();
+  }
+
+  @Override
+  protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+    return new FileContextLocationFactory(dfsCluster.getFileSystem().getConf(), pathBase);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
index 20f7403..d57d49f 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
@@ -30,14 +30,12 @@ import java.io.IOException;
 public class HDFSLocationTest extends LocationTestBase {
 
   private static MiniDFSCluster dfsCluster;
-  private static LocationFactory locationFactory;
 
   @BeforeClass
   public static void init() throws IOException {
     Configuration conf = new Configuration();
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
     dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    locationFactory = new HDFSLocationFactory(dfsCluster.getFileSystem());
   }
 
   @AfterClass
@@ -46,7 +44,7 @@ public class HDFSLocationTest extends LocationTestBase {
   }
 
   @Override
-  protected LocationFactory getLocationFactory() {
-    return locationFactory;
+  protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+    return new HDFSLocationFactory(dfsCluster.getFileSystem(), pathBase);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
index 3f6d931..ba21beb 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
@@ -17,24 +17,17 @@
  */
 package org.apache.twill.filesystem;
 
-import org.junit.BeforeClass;
-
-import java.io.IOException;
+import java.io.File;
 
 /**
  *
  */
 public class LocalLocationTest extends LocationTestBase {
 
-  private static LocationFactory locationFactory;
-
-  @BeforeClass
-  public static void init() throws IOException {
-    locationFactory = new LocalLocationFactory(tmpFolder.newFolder());
-  }
-
   @Override
-  protected LocationFactory getLocationFactory() {
-    return locationFactory;
+  protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+    File basePath = new File(tmpFolder.newFolder(), pathBase);
+    basePath.mkdirs();
+    return new LocalLocationFactory(basePath);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
index ee591e7..e01115b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
@@ -17,13 +17,23 @@
  */
 package org.apache.twill.filesystem;
 
+import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.io.CharStreams;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 
 /**
@@ -34,10 +44,60 @@ public abstract class LocationTestBase {
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
 
+  private final LoadingCache<String, LocationFactory> locationFactoryCache = CacheBuilder.newBuilder()
+    .build(new CacheLoader<String, LocationFactory>() {
+      @Override
+      public LocationFactory load(String key) throws Exception {
+        return createLocationFactory(key);
+      }
+    });
+
+  @Test
+  public void testBasic() throws Exception {
+    LocationFactory factory = locationFactoryCache.getUnchecked("basic");
+    URI baseURI = factory.create("/").toURI();
+
+    // Test basic location construction
+    Assert.assertEquals(factory.create("/file"), factory.create("/file"));
+    Assert.assertEquals(factory.create("/file2"),
+                        factory.create(URI.create(baseURI.getScheme() + ":" + baseURI.getPath() + "/file2")));
+    Assert.assertEquals(factory.create("/file3"),
+                        factory.create(
+                          new URI(baseURI.getScheme(), baseURI.getAuthority(),
+                                  baseURI.getPath() + "/file3", null, null)));
+    Assert.assertEquals(factory.create("/"), factory.create("/"));
+    Assert.assertEquals(factory.create("/"), factory.create(URI.create(baseURI.getScheme() + ":" + baseURI.getPath())));
+
+    Assert.assertEquals(factory.create("/"),
+                        factory.create(new URI(baseURI.getScheme(), baseURI.getAuthority(),
+                                               baseURI.getPath(), null, null)));
+
+    // Test file creation and rename
+    Location location = factory.create("/file");
+    Assert.assertTrue(location.createNew());
+    Assert.assertTrue(location.exists());
+
+    Location location2 = factory.create("/file2");
+    String message = "Testing Message";
+    try (Writer writer = new OutputStreamWriter(location2.getOutputStream(), Charsets.UTF_8)) {
+      writer.write(message);
+    }
+    long length = location2.length();
+    long lastModified = location2.lastModified();
+
+    location2.renameTo(location);
+
+    Assert.assertFalse(location2.exists());
+    try (Reader reader = new InputStreamReader(location.getInputStream(), Charsets.UTF_8)) {
+      Assert.assertEquals(message, CharStreams.toString(reader));
+    }
+    Assert.assertEquals(length, location.length());
+    Assert.assertEquals(lastModified, location.lastModified());
+  }
 
   @Test
   public void testDelete() throws IOException {
-    LocationFactory factory = getLocationFactory();
+    LocationFactory factory = locationFactoryCache.getUnchecked("delete");
 
     Location base = factory.create("test").getTempFile(".tmp");
     Assert.assertTrue(base.mkdirs());
@@ -57,7 +117,7 @@ public abstract class LocationTestBase {
 
   @Test
   public void testHelper() {
-    LocationFactory factory = LocationFactories.namespace(getLocationFactory(), "testhelper");
+    LocationFactory factory = LocationFactories.namespace(locationFactoryCache.getUnchecked("helper"), "testhelper");
 
     Location location = factory.create("test");
     Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test"));
@@ -68,7 +128,7 @@ public abstract class LocationTestBase {
 
   @Test
   public void testList() throws IOException {
-    LocationFactory factory = getLocationFactory();
+    LocationFactory factory = locationFactoryCache.getUnchecked("list");
 
     Location dir = factory.create("dir");
 
@@ -107,5 +167,5 @@ public abstract class LocationTestBase {
     }
   }
 
-  protected abstract LocationFactory getLocationFactory();
+  protected abstract LocationFactory createLocationFactory(String pathBase) throws Exception;
 }


[19/22] incubator-twill git commit: Merge branch 'branch-0.7.0'

Posted by ch...@apache.org.
Merge branch 'branch-0.7.0'


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/c6b3f0d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/c6b3f0d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/c6b3f0d4

Branch: refs/heads/site
Commit: c6b3f0d43aff7cd4095edc79f3452eb640b37f79
Parents: 900e382 047e011
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 11 10:35:30 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jan 11 10:35:30 2016 -0800

----------------------------------------------------------------------
 pom.xml                      | 2 +-
 twill-api/pom.xml            | 2 +-
 twill-common/pom.xml         | 2 +-
 twill-core/pom.xml           | 2 +-
 twill-discovery-api/pom.xml  | 2 +-
 twill-discovery-core/pom.xml | 2 +-
 twill-examples/echo/pom.xml  | 2 +-
 twill-examples/pom.xml       | 2 +-
 twill-examples/yarn/pom.xml  | 2 +-
 twill-ext/pom.xml            | 2 +-
 twill-java8-test/pom.xml     | 2 +-
 twill-yarn/pom.xml           | 2 +-
 twill-zookeeper/pom.xml      | 2 +-
 13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[10/22] incubator-twill git commit: (TWILL-155) Sort file listed from directory in TwillLauncher

Posted by ch...@apache.org.
(TWILL-155) Sort file listed from directory in TwillLauncher

This gives a deterministic behavior on the ClassLoader created
by TwillLauncher

This closes #71 on GitHub

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/87b063ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/87b063ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/87b063ca

Branch: refs/heads/site
Commit: 87b063cac642d4a605679049faa4e662b71cfc46
Parents: 161032a
Author: Terence Yim <ch...@apache.org>
Authored: Wed Nov 11 14:03:27 2015 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Nov 11 22:20:05 2015 -0800

----------------------------------------------------------------------
 .../apache/twill/launcher/TwillLauncher.java    | 36 +++++++++++++-------
 1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/87b063ca/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
index 3405709..171b22a 100644
--- a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
@@ -36,6 +36,7 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.jar.JarEntry;
@@ -142,18 +143,15 @@ public final class TwillLauncher {
 
   private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
     try {
-      List<URL> urls = new ArrayList<URL>();
+      List<URL> urls = new ArrayList<>();
       urls.add(dir.toURI().toURL());
       urls.add(new File(dir, "classes").toURI().toURL());
       urls.add(new File(dir, "resources").toURI().toURL());
 
       File libDir = new File(dir, "lib");
-      File[] files = libDir.listFiles();
-      if (files != null) {
-        for (File file : files) {
-          if (file.getName().endsWith(".jar")) {
-            urls.add(file.toURI().toURL());
-          }
+      for (File file : listFiles(libDir)) {
+        if (file.getName().endsWith(".jar")) {
+          urls.add(file.toURI().toURL());
         }
       }
 
@@ -163,7 +161,7 @@ public final class TwillLauncher {
 
       addClassPathsToList(urls, Constants.APPLICATION_CLASSPATH);
 
-      return new URLClassLoader(urls.toArray(new URL[0]));
+      return new URLClassLoader(urls.toArray(new URL[urls.size()]));
 
     } catch (Exception e) {
       throw new IllegalStateException(e);
@@ -190,12 +188,12 @@ public final class TwillLauncher {
     if (classpath.endsWith("/*")) {
       // Grab all .jar files
       File dir = new File(classpath.substring(0, classpath.length() - 2));
-      File[] files = dir.listFiles();
-      if (files == null || files.length == 0) {
+      List<File> files = listFiles(dir);
+      if (files.isEmpty()) {
         return singleItem(dir.toURI().toURL());
       }
 
-      List<URL> result = new ArrayList<URL>(files.length);
+      List<URL> result = new ArrayList<>(files.size());
       for (File file : files) {
         if (file.getName().endsWith(".jar")) {
           result.add(file.toURI().toURL());
@@ -208,7 +206,7 @@ public final class TwillLauncher {
   }
 
   private static Collection<URL> singleItem(URL url) {
-    List<URL> result = new ArrayList<URL>(1);
+    List<URL> result = new ArrayList<>(1);
     result.add(url);
     return result;
   }
@@ -233,4 +231,18 @@ public final class TwillLauncher {
     }
     dir.delete();
   }
+
+  /**
+   * Returns a sorted list of {@link File} under the given directory. The list will be empty if
+   * the given directory is empty, not exist or not a directory.
+   */
+  private static List<File> listFiles(File dir) {
+    File[] files = dir.listFiles();
+    if (files == null || files.length == 0) {
+      return Collections.emptyList();
+    }
+    List<File> fileList = Arrays.asList(files);
+    Collections.sort(fileList);
+    return fileList;
+  }
 }


[02/22] incubator-twill git commit: Merge branch 'master' into site

Posted by ch...@apache.org.
Merge branch 'master' into site


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/4b7c8481
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/4b7c8481
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/4b7c8481

Branch: refs/heads/site
Commit: 4b7c8481f60a5fc46faaee04cf4c48ba277ead82
Parents: 81a5e46 d2d9081
Author: Terence Yim <ch...@apache.org>
Authored: Thu Aug 6 17:07:57 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Aug 6 17:07:57 2015 -0700

----------------------------------------------------------------------
 pom.xml | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4b7c8481/pom.xml
----------------------------------------------------------------------


[13/22] incubator-twill git commit: (TWILL-160) Don’t create parent directory when getting InputStream from a LocalLocation

Posted by ch...@apache.org.
(TWILL-160) Don’t create parent directory when getting InputStream from a LocalLocation

This closes #75 on Github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b80f9b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b80f9b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b80f9b80

Branch: refs/heads/site
Commit: b80f9b8036b59603f227ba1ae43aa099d5329135
Parents: 65c730b
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 4 12:57:18 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 5 11:21:05 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/twill/filesystem/LocalLocation.java | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b80f9b80/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index a873545..037f4f9 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -70,10 +70,6 @@ final class LocalLocation implements Location {
    */
   @Override
   public InputStream getInputStream() throws IOException {
-    File parent = file.getParentFile();
-    if (!parent.exists()) {
-      parent.mkdirs();
-    }
     return new FileInputStream(file);
   }
 


[18/22] incubator-twill git commit: Prepare for releasing 0.7.0-incubating

Posted by ch...@apache.org.
Prepare for releasing 0.7.0-incubating


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/047e011f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/047e011f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/047e011f

Branch: refs/heads/site
Commit: 047e011f25066805eb820ff661364937ef7ef043
Parents: 900e382
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 11 10:17:37 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jan 11 10:17:37 2016 -0800

----------------------------------------------------------------------
 pom.xml                      | 2 +-
 twill-api/pom.xml            | 2 +-
 twill-common/pom.xml         | 2 +-
 twill-core/pom.xml           | 2 +-
 twill-discovery-api/pom.xml  | 2 +-
 twill-discovery-core/pom.xml | 2 +-
 twill-examples/echo/pom.xml  | 2 +-
 twill-examples/pom.xml       | 2 +-
 twill-examples/yarn/pom.xml  | 2 +-
 twill-ext/pom.xml            | 2 +-
 twill-java8-test/pom.xml     | 2 +-
 twill-yarn/pom.xml           | 2 +-
 twill-zookeeper/pom.xml      | 2 +-
 13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4daa68d..c116322 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
 
     <groupId>org.apache.twill</groupId>
     <artifactId>twill-parent</artifactId>
-    <version>0.7.0-incubating-SNAPSHOT</version>
+    <version>0.7.0-incubating</version>
     <packaging>pom</packaging>
     <name>Apache Twill</name>
     <url>http://twill.incubator.apache.org</url>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-api/pom.xml b/twill-api/pom.xml
index c011dee..3cb6d97 100644
--- a/twill-api/pom.xml
+++ b/twill-api/pom.xml
@@ -24,7 +24,7 @@
     <parent>
         <groupId>org.apache.twill</groupId>
         <artifactId>twill-parent</artifactId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
 
     <artifactId>twill-api</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-common/pom.xml
----------------------------------------------------------------------
diff --git a/twill-common/pom.xml b/twill-common/pom.xml
index b371acb..d1acb55 100644
--- a/twill-common/pom.xml
+++ b/twill-common/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index 26ae718..e6b5514 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
index 147c6bc..a38aff3 100644
--- a/twill-discovery-api/pom.xml
+++ b/twill-discovery-api/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
index fb3dcbb..f80510d 100644
--- a/twill-discovery-core/pom.xml
+++ b/twill-discovery-core/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-examples/echo/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/echo/pom.xml b/twill-examples/echo/pom.xml
index 8ba89ff..474127e 100644
--- a/twill-examples/echo/pom.xml
+++ b/twill-examples/echo/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
     <parent>
         <artifactId>twill-examples</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
 
     <name>Apache Twill examples: Echo</name>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-examples/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/pom.xml b/twill-examples/pom.xml
index 10163c2..3e2929b 100644
--- a/twill-examples/pom.xml
+++ b/twill-examples/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
     <parent>
         <groupId>org.apache.twill</groupId>
         <artifactId>twill-parent</artifactId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
 
     <artifactId>twill-examples</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-examples/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/yarn/pom.xml b/twill-examples/yarn/pom.xml
index 71f86e8..dea3777 100644
--- a/twill-examples/yarn/pom.xml
+++ b/twill-examples/yarn/pom.xml
@@ -24,7 +24,7 @@ limitations under the License.
     <parent>
         <artifactId>twill-examples</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
 
     <name>Apache Twill examples: YARN</name>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-ext/pom.xml
----------------------------------------------------------------------
diff --git a/twill-ext/pom.xml b/twill-ext/pom.xml
index 539fd20..0eb45cd 100644
--- a/twill-ext/pom.xml
+++ b/twill-ext/pom.xml
@@ -22,7 +22,7 @@ limitations under the License.
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-java8-test/pom.xml
----------------------------------------------------------------------
diff --git a/twill-java8-test/pom.xml b/twill-java8-test/pom.xml
index 892a025..65b1c6e 100644
--- a/twill-java8-test/pom.xml
+++ b/twill-java8-test/pom.xml
@@ -24,7 +24,7 @@
     <parent>
         <groupId>org.apache.twill</groupId>
         <artifactId>twill-parent</artifactId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
 
     <artifactId>twill-java8-test</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index 9d61831..1df959a 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 6cf302d..9edc990 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating-SNAPSHOT</version>
+        <version>0.7.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 


[04/22] incubator-twill git commit: (TWILL-141) Fix namespacing of ZKClient

Posted by ch...@apache.org.
(TWILL-141) Fix namespacing of ZKClient

- Not to fail with exception when creating “/“ through the
  namespaced ZKClient
- Return the correct path in OperationFuture.getRequestPath() for
  futures returned from namespaced ZKClient

This closes #64 from GitHub.

Signed-off-by: hsaputra <hs...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/f88e18f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/f88e18f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/f88e18f7

Branch: refs/heads/site
Commit: f88e18f7587aae3528b1e47a22b0b281ff91f95e
Parents: 66402b4
Author: Terence Yim <ch...@apache.org>
Authored: Thu Sep 24 03:22:24 2015 -0700
Committer: hsaputra <hs...@apache.org>
Committed: Fri Sep 25 12:17:53 2015 -0700

----------------------------------------------------------------------
 .../internal/zookeeper/NamespaceZKClient.java   | 33 ++++++++----
 .../zookeeper/SettableOperationFuture.java      |  2 +-
 .../apache/twill/zookeeper/ZKClientTest.java    | 55 ++++++++++++++++++++
 3 files changed, 78 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f88e18f7/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
index e19bb0a..239a656 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
@@ -70,47 +70,58 @@ public final class NamespaceZKClient extends ForwardingZKClient {
   @Override
   public OperationFuture<String> create(String path, @Nullable byte[] data,
                                         CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
-    return relayPath(delegate.create(namespace + path, data, createMode, createParent, acl),
+    return relayPath(delegate.create(getNamespacedPath(path), data, createMode, createParent, acl),
                      this.<String>createFuture(path));
   }
 
   @Override
   public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
-    return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
+    return relayFuture(delegate.exists(getNamespacedPath(path), watcher), this.<Stat>createFuture(path));
   }
 
   @Override
   public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
-    return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
+    return relayFuture(delegate.getChildren(getNamespacedPath(path), watcher), this.<NodeChildren>createFuture(path));
   }
 
   @Override
   public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
-    return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
+    return relayFuture(delegate.getData(getNamespacedPath(path), watcher), this.<NodeData>createFuture(path));
   }
 
   @Override
   public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
+    return relayFuture(delegate.setData(getNamespacedPath(dataPath), data, version), this.<Stat>createFuture(dataPath));
   }
 
   @Override
   public OperationFuture<String> delete(String deletePath, int version) {
-    return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
+    return relayPath(delegate.delete(getNamespacedPath(deletePath), version), this.<String>createFuture(deletePath));
   }
 
   @Override
   public OperationFuture<ACLData> getACL(String path) {
-    return relayFuture(delegate.getACL(namespace + path), this.<ACLData>createFuture(path));
+    return relayFuture(delegate.getACL(getNamespacedPath(path)), this.<ACLData>createFuture(path));
   }
 
   @Override
   public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
-    return relayFuture(delegate.setACL(namespace + path, acl, version), this.<Stat>createFuture(path));
+    return relayFuture(delegate.setACL(getNamespacedPath(path), acl, version), this.<Stat>createFuture(path));
+  }
+
+  /**
+   * Returns the namespaced path for the given path. The returned path should be used when performing
+   * ZK operations with the delegating ZKClient.
+   */
+  private String getNamespacedPath(String path) {
+    if ("/".equals(path)) {
+      return namespace;
+    }
+    return namespace + path;
   }
 
   private <V> SettableOperationFuture<V> createFuture(String path) {
-    return SettableOperationFuture.create(namespace + path, Threads.SAME_THREAD_EXECUTOR);
+    return SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
   }
 
   private <V> OperationFuture<V> relayFuture(final OperationFuture<V> from, final SettableOperationFuture<V> to) {
@@ -134,8 +145,8 @@ public final class NamespaceZKClient extends ForwardingZKClient {
       @Override
       public void run() {
         try {
-          String path = from.get();
-          to.set(path.substring(namespace.length()));
+          String relativePath = from.get().substring(namespace.length());
+          to.set(relativePath.isEmpty() ? "/" : relativePath);
         } catch (Exception e) {
           to.setException(e.getCause());
         }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f88e18f7/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
index 06f089e..f98b8f6 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
@@ -35,7 +35,7 @@ public final class SettableOperationFuture<V> extends AbstractFuture<V> implemen
   private final Executor executor;
 
   public static <V> SettableOperationFuture<V> create(String path, Executor executor) {
-    return new SettableOperationFuture<V>(path, executor);
+    return new SettableOperationFuture<>(path, executor);
   }
 
   private SettableOperationFuture(String requestPath, Executor executor) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f88e18f7/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index a9120c3..b9cb8a4 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -394,4 +394,59 @@ public class ZKClientTest {
       serverThread.interrupt();
     }
   }
+
+  @Test
+  public void testNamespace() throws ExecutionException, InterruptedException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      ZKClientService zkClient = ZKClientService.Builder
+        .of(zkServer.getConnectionStr())
+        .build();
+      zkClient.startAndWait();
+
+      ZKClient zk = ZKClients.namespace(zkClient, "/test");
+      // Create the "/ should create the "/test" from the root
+      OperationFuture<String> createFuture = zk.create("/", null, CreateMode.PERSISTENT);
+      // Shouldn't have namespace as prefix for path returned from the future.
+      Assert.assertEquals("/", createFuture.getRequestPath());
+      Assert.assertEquals("/", createFuture.get());
+
+      // Create a path under the namespace
+      createFuture = zk.create("/subpath", null, CreateMode.PERSISTENT);
+      Assert.assertEquals("/subpath", createFuture.getRequestPath());
+      Assert.assertEquals("/subpath", createFuture.get());
+
+      // Check for exists
+      OperationFuture<Stat> existsFuture = zk.exists("/subpath");
+      Assert.assertEquals("/subpath", existsFuture.getRequestPath());
+      Assert.assertNotNull(existsFuture.get());
+
+      // Put some data
+      OperationFuture<Stat> setFuture = zk.setData("/subpath", "hello".getBytes());
+      Assert.assertEquals("/subpath", setFuture.getRequestPath());
+      Assert.assertNotNull(setFuture.get());
+
+      // Read the data back
+      OperationFuture<NodeData> getFuture = zk.getData("/subpath");
+      Assert.assertEquals("/subpath", getFuture.getRequestPath());
+      Assert.assertArrayEquals("hello".getBytes(), getFuture.get().getData());
+
+      // Delete the sub path
+      OperationFuture < String > deleteFuture = zk.delete("/subpath");
+      Assert.assertEquals("/subpath", deleteFuture.getRequestPath());
+      Assert.assertEquals("/subpath", deleteFuture.get());
+
+      // Delete the namespace root
+      deleteFuture = zk.delete("/");
+      Assert.assertEquals("/", deleteFuture.getRequestPath());
+      Assert.assertEquals("/", deleteFuture.get());
+
+      // The namespace must be gone
+      Assert.assertNull(zkClient.exists("/test").get());
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
 }


[07/22] incubator-twill git commit: Remove the if-check in the YarnTwillPreparer class for log level when setting env key.

Posted by ch...@apache.org.
Remove the if-check in the YarnTwillPreparer class for log level when setting env key.

Add precondition check in the setLogLevel method to prevent setting null early.

This closes #67

Signed-off-by: hsaputra <hs...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/f6d2b6c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/f6d2b6c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/f6d2b6c4

Branch: refs/heads/site
Commit: f6d2b6c427bd7ccec3951cfdeeb2b0e2db12e6da
Parents: e95c6a4
Author: hsaputra <hs...@apache.org>
Authored: Fri Oct 9 13:23:28 2015 -0700
Committer: hsaputra <hs...@apache.org>
Committed: Fri Oct 9 20:31:30 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/twill/yarn/YarnTwillPreparer.java    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f6d2b6c4/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 6da2f8b..d4edfeb 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -300,6 +300,7 @@ final class YarnTwillPreparer implements TwillPreparer {
 
   @Override
   public TwillPreparer setLogLevel(LogEntry.Level logLevel) {
+    Preconditions.checkNotNull(logLevel);
     this.logLevel = logLevel;
     return this;
   }
@@ -350,10 +351,8 @@ final class YarnTwillPreparer implements TwillPreparer {
             .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
             .put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS));
 
-          if (logLevel != null) {
-            LOG.debug("Log level is set to {} for the Twill application.", logLevel);
-            builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
-          }
+          LOG.debug("Log level is set to {} for the Twill application.", logLevel);
+          builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
 
           int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
                                                     Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);


[06/22] incubator-twill git commit: (TWILL-148) Allow setting of env variables - Added methods to TwillPreparer for setting env for runnables

Posted by ch...@apache.org.
(TWILL-148) Allow setting of env variables - Added methods to TwillPreparer for setting env for runnables

This closes #69 on github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e95c6a49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e95c6a49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e95c6a49

Branch: refs/heads/site
Commit: e95c6a495e0faef7569adbeae2d768f57391b44f
Parents: ef8b1ea
Author: Terence Yim <ch...@apache.org>
Authored: Fri Oct 9 18:27:17 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 9 19:34:02 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillPreparer.java     |  20 ++++
 .../org/apache/twill/internal/Constants.java    |   1 +
 .../appmaster/ApplicationMasterService.java     |  46 +++++---
 .../apache/twill/yarn/YarnTwillPreparer.java    |  55 ++++++++-
 .../apache/twill/yarn/EnvironmentTestRun.java   | 111 +++++++++++++++++++
 5 files changed, 219 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index f60080a..d7d5529 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -21,6 +21,7 @@ import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 
 import java.net.URI;
+import java.util.Map;
 
 /**
  * This interface exposes methods to set up the Twill runtime environment and start a Twill application.
@@ -179,6 +180,25 @@ public interface TwillPreparer {
   TwillPreparer withClassPaths(Iterable<String> classPaths);
 
   /**
+   * Adds the set of environment variables that will be set as container environment variables for all runnables.
+   *
+   * @param env set of environment variables
+   * @return This {@link TwillPreparer}
+   */
+  TwillPreparer withEnv(Map<String, String> env);
+
+  /**
+   * Adds the set of environment variables that will be set as container environment variables for the given runnable.
+   * Environment variables set through this method has higher precedence than the one set through {@link #withEnv(Map)}
+   * if there is a key clash.
+   *
+   * @param runnableName Name of the {@link TwillRunnable}.
+   * @param env set of environment variables
+   * @return This {@link TwillPreparer}
+   */
+  TwillPreparer withEnv(String runnableName, Map<String, String> env);
+
+  /**
    * Adds the set of paths to the classpath on the target machine for ApplicationMaster and all runnables.
    * @return This {@link TwillPreparer}
    */

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index f897bfa..39de851 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -62,6 +62,7 @@ public final class Constants {
     public static final String LOCALIZE_FILES = "localizeFiles.json";
     public static final String TWILL_SPEC = "twillSpec.json";
     public static final String ARGUMENTS = "arguments.json";
+    public static final String ENVIRONMENTS = "environments.json";
     public static final String LOGBACK_TEMPLATE = "logback-template.xml";
     public static final String JVM_OPTIONS = "jvm.opts";
     public static final String CREDENTIALS = "credentials.store";

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 355cea3..e1523d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.DiscreteDomains;
 import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -88,7 +87,9 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -122,6 +123,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   private final EventHandler eventHandler;
   private final Location applicationLocation;
   private final PlacementPolicyManager placementPolicyManager;
+  private final Map<String, Map<String, String>> environments;
 
   private volatile boolean stopped;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
@@ -140,6 +142,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     this.jvmOpts = loadJvmOptions();
     this.reservedMemory = getReservedMemory();
     this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
+    this.environments = getEnvironments();
 
     this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
                                                         Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
@@ -634,18 +637,22 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
       int containerCount = expectedContainers.getExpected(runnableName);
 
-      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(
-        ImmutableMap.<String, String>builder()
-          .put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR))
-          .put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER))
-          .put(EnvKeys.TWILL_APP_RUN_ID, runId.getId())
-          .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
-          .put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL))
-          .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
-          .put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect())
-          .build()
-        , getLocalizeFiles(), credentials
-      );
+      // Setup container environment variables
+      Map<String, String> env = new LinkedHashMap<>();
+      if (environments.containsKey(runnableName)) {
+        env.putAll(environments.get(runnableName));
+      }
+      // Override with system env
+      env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR));
+      env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER));
+      env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
+      env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
+      env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
+      env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+      env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
+
+      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
+                                                                                         credentials);
 
       TwillContainerLauncher launcher = new TwillContainerLauncher(
         twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
@@ -679,6 +686,19 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     }
   }
 
+  private Map<String, Map<String, String>> getEnvironments() {
+    File envFile = new File(Constants.Files.ENVIRONMENTS);
+    if (!envFile.exists()) {
+      return new HashMap<>();
+    }
+
+    try (Reader reader = Files.newReader(envFile, Charsets.UTF_8)) {
+      return new Gson().fromJson(reader, new TypeToken<Map<String, Map<String, String>>>() { }.getType());
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
   private String getZKNamespace(String runnableName) {
     return String.format("/%s/runnables/%s", runId.getId(), runnableName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index ea77116..6da2f8b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.OutputSupplier;
 import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -52,7 +53,6 @@ import org.apache.twill.api.TwillPreparer;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.filesystem.HDFSLocationFactory;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.filesystem.LocationFactory;
 import org.apache.twill.internal.ApplicationBundler;
@@ -97,6 +97,8 @@ import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -125,6 +127,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final List<URI> resources = Lists.newArrayList();
   private final List<String> classPaths = Lists.newArrayList();
   private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create();
+  private final Map<String, Map<String, String>> environments = new HashMap<>();
   private final List<String> applicationClassPaths = Lists.newArrayList();
   private final Credentials credentials;
   private final int reservedMemory;
@@ -214,6 +217,8 @@ final class YarnTwillPreparer implements TwillPreparer {
 
   @Override
   public TwillPreparer withArguments(String runnableName, Iterable<String> args) {
+    Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
+                                "Runnable %s is not defined in the application.", runnableName);
     runnableArgs.putAll(runnableName, args);
     return this;
   }
@@ -252,6 +257,23 @@ final class YarnTwillPreparer implements TwillPreparer {
   }
 
   @Override
+  public TwillPreparer withEnv(Map<String, String> env) {
+    // Add the given environments to all runnables
+    for (String runnableName : twillSpec.getRunnables().keySet()) {
+      setEnv(runnableName, env, false);
+    }
+    return this;
+  }
+
+  @Override
+  public TwillPreparer withEnv(String runnableName, Map<String, String> env) {
+    Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
+                                "Runnable %s is not defined in the application.", runnableName);
+    setEnv(runnableName, env, true);
+    return this;
+  }
+
+  @Override
   public TwillPreparer withApplicationClassPaths(String... classPaths) {
     return withApplicationClassPaths(ImmutableList.copyOf(classPaths));
   }
@@ -306,6 +328,7 @@ final class YarnTwillPreparer implements TwillPreparer {
           saveLauncher(localFiles);
           saveJvmOptions(localFiles);
           saveArguments(new Arguments(arguments, runnableArgs), localFiles);
+          saveEnvironments(localFiles);
           saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
                                                      Constants.Files.LOGBACK_TEMPLATE,
                                                      Constants.Files.CONTAINER_JAR,
@@ -360,6 +383,21 @@ final class YarnTwillPreparer implements TwillPreparer {
     }
   }
 
+  private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
+    Map<String, String> environment = environments.get(runnableName);
+    if (environment == null) {
+      environment = new LinkedHashMap<>(env);
+      environments.put(runnableName, environment);
+      return;
+    }
+
+    for (Map.Entry<String, String> entry : env.entrySet()) {
+      if (overwrite || !environment.containsKey(entry.getKey())) {
+        environment.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
   private Credentials createCredentials() {
     Credentials credentials = new Credentials();
 
@@ -596,6 +634,21 @@ final class YarnTwillPreparer implements TwillPreparer {
     localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location));
   }
 
+  private void saveEnvironments(Map<String, LocalFile> localFiles) throws IOException {
+    if (environments.isEmpty()) {
+      return;
+    }
+
+    LOG.debug("Create and copy {}", Constants.Files.ENVIRONMENTS);
+    final Location location = createTempLocation(Constants.Files.ENVIRONMENTS);
+    try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+      new Gson().toJson(environments, writer);
+    }
+    LOG.debug("Done {}", Constants.Files.ENVIRONMENTS);
+
+    localFiles.put(Constants.Files.ENVIRONMENTS, createLocalFile(Constants.Files.ENVIRONMENTS, location));
+  }
+
   /**
    * Serializes the list of files that needs to localize from AM to Container.
    */

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java
new file mode 100644
index 0000000..4309cb4
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.LineReader;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for testing environment settings.
+ */
+public class EnvironmentTestRun extends BaseYarnTest {
+
+  @Test
+  public void testEnv() throws Exception {
+    TwillRunner runner = getTwillRunner();
+
+    TwillController controller = runner.prepare(new EchoApp())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments("echo")
+      .withArguments("echo1", "echo1")
+      .withArguments("echo2", "echo2")
+      .withEnv(ImmutableMap.of("GREETING", "Hello"))
+      .withEnv("echo2", ImmutableMap.of("GREETING", "Hello2"))
+      .start();
+
+    // Service echo1 should returns "Hello" as greeting, echo2 should returns "Hello2"
+    Map<String, String> runnableGreetings = ImmutableMap.of("echo1", "Hello", "echo2", "Hello2");
+    for (Map.Entry<String, String> entry : runnableGreetings.entrySet()) {
+      Discoverable discoverable = getDiscoverable(controller.discoverService(entry.getKey()), 60, TimeUnit.SECONDS);
+      try (
+        Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+                                   discoverable.getSocketAddress().getPort())
+      ) {
+        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+        writer.println("GREETING");
+        Assert.assertEquals(entry.getValue(), reader.readLine());
+      }
+    }
+
+    controller.terminate().get();
+  }
+
+  private Discoverable getDiscoverable(ServiceDiscovered serviceDiscovered,
+                                       long timeout, TimeUnit unit) throws Exception {
+    final SettableFuture<Discoverable> completion = SettableFuture.create();
+    serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+      @Override
+      public void onChange(ServiceDiscovered serviceDiscovered) {
+        Iterator<Discoverable> itor = serviceDiscovered.iterator();
+        if (itor.hasNext()) {
+          completion.set(itor.next());
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+    return completion.get(timeout, unit);
+  }
+
+  /**
+   * Application to add two {@link EnvironmentEchoServer} for testing.
+   */
+  public static final class EchoApp implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("EchoApp")
+        .withRunnable()
+          .add("echo1", new EnvironmentEchoServer()).noLocalFiles()
+          .add("echo2", new EnvironmentEchoServer()).noLocalFiles()
+        .anyOrder()
+        .build();
+    }
+  }
+}


[22/22] incubator-twill git commit: Merge branch 'master' into site

Posted by ch...@apache.org.
Merge branch 'master' into site


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/87e2b465
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/87e2b465
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/87e2b465

Branch: refs/heads/site
Commit: 87e2b4652ec6da690a8dbad6de88080bc9cf6154
Parents: 4b7c848 180e446
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 26 11:15:56 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 26 11:15:56 2016 -0800

----------------------------------------------------------------------
 NOTICE                                          |   2 +-
 pom.xml                                         |   4 +-
 twill-api/pom.xml                               |   2 +-
 .../org/apache/twill/api/TwillPreparer.java     |  20 ++
 twill-common/pom.xml                            |   2 +-
 .../apache/twill/filesystem/LocalLocation.java  |  15 +-
 .../twill/filesystem/LocalLocationFactory.java  |   4 +-
 .../org/apache/twill/internal/Constants.java    |  83 +++++++
 twill-core/pom.xml                              |   2 +-
 .../twill/internal/AbstractTwillService.java    |   2 +-
 .../internal/AbstractZKServiceController.java   |   3 +-
 .../org/apache/twill/internal/Constants.java    |  76 -------
 .../apache/twill/internal/ContainerInfo.java    |  15 +-
 .../apache/twill/internal/ProcessLauncher.java  |   2 +-
 .../twill/internal/ResourceCapability.java      |  34 +++
 .../twill/internal/TwillContainerLauncher.java  |  19 +-
 .../internal/json/TwillRunResourcesCodec.java   |  14 +-
 .../kafka/client/SimpleKafkaConsumer.java       |  63 +++++-
 .../apache/twill/internal/utils/Resources.java  |  46 ++++
 .../apache/twill/launcher/TwillLauncher.java    |  36 ++-
 .../apache/twill/kafka/client/KafkaTest.java    |  68 ++++++
 twill-discovery-api/pom.xml                     |   2 +-
 twill-discovery-core/pom.xml                    |   2 +-
 .../twill/discovery/ZKDiscoveryService.java     |   4 +-
 .../twill/discovery/ZKDiscoveryServiceTest.java |   2 +-
 twill-examples/echo/pom.xml                     |   2 +-
 twill-examples/pom.xml                          |   2 +-
 twill-examples/yarn/pom.xml                     |   2 +-
 twill-ext/pom.xml                               |   2 +-
 twill-java8-test/pom.xml                        |   2 +-
 twill-yarn/pom.xml                              |   2 +-
 .../internal/yarn/Hadoop20YarnAppClient.java    |  23 +-
 .../internal/yarn/Hadoop21YarnAppClient.java    |  24 +-
 .../twill/filesystem/FileContextLocation.java   | 219 +++++++++++++++++++
 .../filesystem/FileContextLocationFactory.java  | 119 ++++++++++
 .../apache/twill/filesystem/HDFSLocation.java   |   4 +-
 .../twill/filesystem/HDFSLocationFactory.java   |  18 +-
 .../org/apache/twill/internal/ServiceMain.java  |  40 ++--
 .../appmaster/ApplicationMasterInfo.java        |  63 ++++++
 .../appmaster/ApplicationMasterMain.java        |  89 +++++++-
 .../ApplicationMasterProcessLauncher.java       |  24 +-
 .../appmaster/ApplicationMasterService.java     |  74 +++++--
 .../appmaster/ApplicationSubmitter.java         |   3 +-
 .../internal/container/TwillContainerMain.java  |   5 +-
 .../yarn/AbstractYarnProcessLauncher.java       |   3 +-
 .../twill/internal/yarn/YarnAppClient.java      |  11 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |  87 ++++++--
 .../twill/yarn/YarnTwillRunnerService.java      |   4 +-
 .../filesystem/FileContextLocationTest.java     |  50 +++++
 .../twill/filesystem/HDFSLocationTest.java      |   6 +-
 .../twill/filesystem/LocalLocationTest.java     |  17 +-
 .../twill/filesystem/LocationTestBase.java      |  68 +++++-
 .../org/apache/twill/yarn/BaseYarnTest.java     |   4 +
 .../apache/twill/yarn/ContainerSizeTestRun.java |  83 ++++++-
 .../apache/twill/yarn/EchoServerTestRun.java    |  68 +++++-
 .../apache/twill/yarn/EnvironmentTestRun.java   | 111 ++++++++++
 .../twill/yarn/ServiceDiscoveryTestRun.java     |  91 ++++----
 .../org/apache/twill/yarn/SocketServer.java     |  13 +-
 .../java/org/apache/twill/yarn/TwillTester.java |   4 +
 .../org/apache/twill/yarn/YarnTestSuite.java    |  32 +--
 twill-zookeeper/pom.xml                         |   2 +-
 .../internal/zookeeper/NamespaceZKClient.java   |  33 ++-
 .../zookeeper/SettableOperationFuture.java      |   2 +-
 .../apache/twill/zookeeper/ZKClientTest.java    |  75 ++++++-
 64 files changed, 1630 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/87e2b465/pom.xml
----------------------------------------------------------------------


[16/22] incubator-twill git commit: (TWILL-151) Improve Logging error when fetching message after Kafka server is stopped

Posted by ch...@apache.org.
(TWILL-151) Improve Logging error when fetching message after Kafka server is stopped

This closes #65 on GitHub.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/323e5af4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/323e5af4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/323e5af4

Branch: refs/heads/site
Commit: 323e5af4e536f3b5f306a5c59cd6787bf3a99818
Parents: b740da4
Author: sanojkodikkara <sa...@gmail.com>
Authored: Wed Sep 30 16:23:08 2015 +0530
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Jan 6 10:55:36 2016 -0800

----------------------------------------------------------------------
 .../internal/kafka/client/SimpleKafkaConsumer.java   | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/323e5af4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 8cfe889..0299e56 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
+
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -39,6 +40,7 @@ import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
+
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.kafka.client.BrokerInfo;
@@ -49,6 +51,7 @@ import org.apache.twill.kafka.client.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.ConnectException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Iterator;
 import java.util.List;
@@ -374,9 +377,15 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
           invokeCallback(messages, offset);
           backoff.reset();
         } catch (Throwable t) {
-          if (running || !(t instanceof ClosedByInterruptException)) {
-            // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
-            LOG.info("Exception when fetching message on {}.", topicPart, t);
+          // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
+          if (!running) {
+            LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
+          } else {
+            if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
+              LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
+            } else {
+              LOG.info("Exception when fetching message on {}.", topicPart, t);
+            }
             backoff.backoff();
           }
           consumers.refresh(consumerEntry.getKey());


[09/22] incubator-twill git commit: Added missing test to YarnTestSuite.

Posted by ch...@apache.org.
Added missing test to YarnTestSuite.

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/161032a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/161032a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/161032a2

Branch: refs/heads/site
Commit: 161032a209535bf08265df7107a345b15592d886
Parents: e4a3676
Author: Terence Yim <ch...@apache.org>
Authored: Fri Oct 23 14:09:24 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 23 14:09:24 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/yarn/YarnTestSuite.java    | 32 +++++++++++---------
 1 file changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/161032a2/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index d9efe3c..bea99d0 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -25,21 +25,23 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-                      PlacementPolicyTestRun.class,
-                      EchoServerTestRun.class,
-                      ResourceReportTestRun.class,
-                      TaskCompletedTestRun.class,
-                      DistributeShellTestRun.class,
-                      LocalFileTestRun.class,
-                      FailureRestartTestRun.class,
-                      ProvisionTimeoutTestRun.class,
-                      LogHandlerTestRun.class,
-                      SessionExpireTestRun.class,
-                      ServiceDiscoveryTestRun.class,
-                      DebugTestRun.class,
-                      ContainerSizeTestRun.class,
-                      InitializeFailTestRun.class
-                    })
+  ContainerSizeTestRun.class,
+  DebugTestRun.class,
+  DistributeShellTestRun.class,
+  EchoServerTestRun.class,
+  EnvironmentTestRun.class,
+  FailureRestartTestRun.class,
+  InitializeFailTestRun.class,
+  LocalFileTestRun.class,
+  LogHandlerTestRun.class,
+  LogLevelTestRun.class,
+  PlacementPolicyTestRun.class,
+  ProvisionTimeoutTestRun.class,
+  ResourceReportTestRun.class,
+  ServiceDiscoveryTestRun.class,
+  SessionExpireTestRun.class,
+  TaskCompletedTestRun.class
+})
 public final class YarnTestSuite extends BaseYarnTest {
 
 }


[11/22] incubator-twill git commit: (TWILL-156) use Files.move instead of File.renameTo so we can have options to replace existing files and perform atomic move, this allows us to support windows rename

Posted by ch...@apache.org.
(TWILL-156) use Files.move instead of File.renameTo so we can have options to replace existing files and perform atomic move, this allows us to support windows rename

This closes #72 on GitHub.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/359b12b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/359b12b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/359b12b9

Branch: refs/heads/site
Commit: 359b12b904e0c52c54453c6afc5ef578d484c0a5
Parents: 87b063c
Author: shankar <sh...@cask.co>
Authored: Fri Dec 4 15:46:59 2015 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Dec 4 23:56:58 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/twill/filesystem/LocalLocation.java  | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/359b12b9/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index a560694..a873545 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -25,6 +25,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
@@ -176,9 +179,11 @@ final class LocalLocation implements Location {
   @Override
   public Location renameTo(Location destination) throws IOException {
     // destination will always be of the same type as this location
-    boolean success = file.renameTo(((LocalLocation) destination).file);
-    if (success) {
-      return new LocalLocation(locationFactory, ((LocalLocation) destination).file);
+    Path target = Files.move(file.toPath(), ((LocalLocation) destination).file.toPath(),
+                             StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+
+    if (target != null) {
+      return new LocalLocation(locationFactory, target.toFile());
     } else {
       return null;
     }


[03/22] incubator-twill git commit: (TWILL-153) Honor actual resource size of the container

Posted by ch...@apache.org.
(TWILL-153) Honor actual resource size of the container

- Determine the -Xmx based on the actual size of the container
  - it can be smaller or bigger than the one requested in the TwillSpec
- Refactor resource specification for AM
  - A forward looking change for TWILL-90
- Simple code cleanup to get rid of code warning from IDE.
- Fix a easy to fail test - ZKClientTest.testExpireRewatch()

This closes #63 in Github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/66402b4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/66402b4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/66402b4f

Branch: refs/heads/site
Commit: 66402b4f234290194e6c9c8de3d7edf84aad22ff
Parents: d2d9081
Author: Terence Yim <ch...@apache.org>
Authored: Tue Sep 22 14:35:26 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Sep 23 22:34:45 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/internal/Constants.java    |  5 +-
 .../apache/twill/internal/ContainerInfo.java    | 15 ++--
 .../apache/twill/internal/ProcessLauncher.java  |  2 +-
 .../twill/internal/ResourceCapability.java      | 34 ++++++++
 .../twill/internal/TwillContainerLauncher.java  | 19 ++---
 .../internal/json/TwillRunResourcesCodec.java   | 14 ++--
 .../apache/twill/internal/utils/Resources.java  | 46 +++++++++++
 .../internal/yarn/Hadoop20YarnAppClient.java    | 23 ++++--
 .../internal/yarn/Hadoop21YarnAppClient.java    | 24 ++++--
 .../appmaster/ApplicationMasterInfo.java        | 63 +++++++++++++++
 .../ApplicationMasterProcessLauncher.java       | 24 ++----
 .../appmaster/ApplicationMasterService.java     | 24 ++++--
 .../appmaster/ApplicationSubmitter.java         |  3 +-
 .../yarn/AbstractYarnProcessLauncher.java       |  3 +-
 .../twill/internal/yarn/YarnAppClient.java      | 11 +--
 .../apache/twill/yarn/YarnTwillPreparer.java    | 14 ++--
 .../apache/twill/yarn/ContainerSizeTestRun.java | 83 +++++++++++++++++++-
 .../apache/twill/zookeeper/ZKClientTest.java    | 20 +++--
 18 files changed, 339 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index defd013..f897bfa 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -37,14 +37,13 @@ public final class Constants {
    */
   public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
 
+  public static final double HEAP_MIN_RATIO = 0.7d;
+
   /** Memory size of AM. */
   public static final int APP_MASTER_MEMORY_MB = 512;
 
   public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
 
-  public static final String STDOUT = "stdout";
-  public static final String STDERR = "stderr";
-
   public static final String CLASSPATH = "classpath";
   public static final String APPLICATION_CLASSPATH = "application-classpath";
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
index 67c21d3..5c93ede 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
@@ -22,15 +22,20 @@ import java.net.InetAddress;
 /**
  * Represents information of the container that the processing is/will be running in.
  */
-public interface ContainerInfo {
+public interface ContainerInfo extends ResourceCapability {
 
+  /**
+   * Returns the ID of the container.
+   */
   String getId();
 
+  /**
+   * Returns the host information of the container.
+   */
   InetAddress getHost();
 
+  /**
+   * Returns the port for communicating to the container host.
+   */
   int getPort();
-
-  int getMemoryMB();
-
-  int getVirtualCores();
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
index d0f289b..beb8e6b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
@@ -26,7 +26,7 @@ import java.util.Map;
  *
  * @param <T> Type of the object that contains information about the container that the process is going to launch.
  */
-public interface ProcessLauncher<T> {
+public interface ProcessLauncher<T extends ResourceCapability> {
 
   /**
    * Returns information about the container that this launch would launch process in.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
new file mode 100644
index 0000000..2cdd080
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * Represents information about compute resources capability.
+ */
+public interface ResourceCapability {
+
+  /**
+   * Returns memory size in MB.
+   */
+  int getMemoryMB();
+
+  /**
+   * Returns the number of virtual cpu cores.
+   */
+  int getVirtualCores();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 0667bb4..d01db4b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -27,6 +27,7 @@ import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.utils.Resources;
 import org.apache.twill.launcher.FindFreePort;
 import org.apache.twill.launcher.TwillLauncher;
 import org.apache.twill.zookeeper.NodeData;
@@ -46,9 +47,8 @@ public final class TwillContainerLauncher {
 
   private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
 
-  private static final double HEAP_MIN_RATIO = 0.7d;
-
   private final RuntimeSpecification runtimeSpec;
+  private final ContainerInfo containerInfo;
   private final ProcessLauncher.PrepareLaunchContext launchContext;
   private final ZKClient zkClient;
   private final int instanceCount;
@@ -56,10 +56,12 @@ public final class TwillContainerLauncher {
   private final int reservedMemory;
   private final Location secureStoreLocation;
 
-  public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext,
+  public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
+                                ProcessLauncher.PrepareLaunchContext launchContext,
                                 ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory,
                                 Location secureStoreLocation) {
     this.runtimeSpec = runtimeSpec;
+    this.containerInfo = containerInfo;
     this.launchContext = launchContext;
     this.zkClient = zkClient;
     this.instanceCount = instanceCount;
@@ -98,15 +100,6 @@ public final class TwillContainerLauncher {
       LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
     }
 
-    int memory = runtimeSpec.getResourceSpecification().getMemorySize();
-    if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
-      // Reduce -Xmx by the reserved memory size.
-      memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory;
-    } else {
-      // If it is a small VM, just discount it by the min ratio.
-      memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
-    }
-
     // Currently no reporting is supported for runnable containers
     launchContext
       .addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId())
@@ -135,6 +128,8 @@ public final class TwillContainerLauncher {
     } else {
       firstCommand = "$JAVA_HOME/bin/java";
     }
+
+    int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, Constants.HEAP_MIN_RATIO);
     commandBuilder.add("-Djava.io.tmpdir=tmp",
                        "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
                        "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index 9a6f555..7dea371 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -35,13 +35,13 @@ import java.lang.reflect.Type;
  */
 public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>,
                                               JsonDeserializer<TwillRunResources> {
-  private final String CONTAINER_ID = "containerId";
-  private final String INSTANCE_ID = "instanceId";
-  private final String HOST = "host";
-  private final String MEMORY_MB = "memoryMB";
-  private final String VIRTUAL_CORES = "virtualCores";
-  private final String DEBUG_PORT = "debugPort";
-  private final String LOG_LEVEL = "logLevel";
+  private static final String CONTAINER_ID = "containerId";
+  private static final String INSTANCE_ID = "instanceId";
+  private static final String HOST = "host";
+  private static final String MEMORY_MB = "memoryMB";
+  private static final String VIRTUAL_CORES = "virtualCores";
+  private static final String DEBUG_PORT = "debugPort";
+  private static final String LOG_LEVEL = "logLevel";
 
   @Override
   public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
new file mode 100644
index 0000000..2ebb789
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+/**
+ * Utility class to help adjusting container resource requirement.
+ */
+public final class Resources {
+
+  /**
+   * Computes the max heap size for a JVM process.
+   *
+   * @param containerMemory memory in MB of the container memory.
+   *                        It is the maximum memory size allowed for the process.
+   * @param nonHeapMemory memory in MB that needs to be reserved for non JVM heap memory for the process.
+   * @param minHeapRatio minimum ratio for heap to non-heap memory.
+   * @return memory in MB representing the max heap size for a JVM process.
+   */
+  public static int computeMaxHeapSize(int containerMemory, int nonHeapMemory, double minHeapRatio) {
+    if (((double) (containerMemory - nonHeapMemory) / containerMemory) >= minHeapRatio) {
+      // Reduce -Xmx by the reserved memory size.
+      return containerMemory - nonHeapMemory;
+    } else {
+      // If it is a small VM, just discount it by the min ratio.
+      return (int) Math.ceil(containerMemory * minHeapRatio);
+    }
+  }
+
+  private Resources() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 3afa49a..bfa494c 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
 import org.apache.twill.internal.appmaster.ApplicationSubmitter;
 import org.slf4j.Logger;
@@ -70,8 +72,8 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
-                                                       @Nullable String schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
+                                                               @Nullable String schedulerQueue) throws Exception {
     // Request for new application
     final GetNewApplicationResponse response = yarnClient.getNewApplication();
     final ApplicationId appId = response.getApplicationId();
@@ -86,10 +88,17 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
       appSubmissionContext.setQueue(schedulerQueue);
     }
 
+    // TODO: Make it adjustable through TwillSpec (TWILL-90)
+    // Set the resource requirement for AM
+    Resource amResource = Records.newRecord(Resource.class);
+    amResource.setMemory(Constants.APP_MASTER_MEMORY_MB);
+    final Resource capability = adjustMemory(response, amResource);
+    ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(), 1);
+
     ApplicationSubmitter submitter = new ApplicationSubmitter() {
 
       @Override
-      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext) {
         ContainerLaunchContext context = launchContext.getLaunchContext();
         addRMToken(context);
         context.setUser(appSubmissionContext.getUser());
@@ -106,7 +115,7 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
       }
     };
 
-    return new ApplicationMasterProcessLauncher(appId, submitter);
+    return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
   }
 
   private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
@@ -158,9 +167,9 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user,
-                                                       TwillSpecification twillSpec,
-                                                       @Nullable String schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+                                                               TwillSpecification twillSpec,
+                                                               @Nullable String schedulerQueue) throws Exception {
     this.user = user;
     return createLauncher(twillSpec, schedulerQueue);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 046e3f1..6fd11e5 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
 import org.apache.twill.internal.appmaster.ApplicationSubmitter;
 import org.slf4j.Logger;
@@ -64,8 +66,8 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
-                                                       @Nullable String schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
+                                                               @Nullable String schedulerQueue) throws Exception {
     // Request for new application
     YarnClientApplication application = yarnClient.createApplication();
     final GetNewApplicationResponse response = application.getNewApplicationResponse();
@@ -80,14 +82,20 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
       appSubmissionContext.setQueue(schedulerQueue);
     }
 
+    // TODO: Make it adjustable through TwillSpec (TWILL-90)
+    // Set the resource requirement for AM
+    final Resource capability = adjustMemory(response, Resource.newInstance(Constants.APP_MASTER_MEMORY_MB, 1));
+    ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(),
+                                                                    capability.getVirtualCores());
+
     ApplicationSubmitter submitter = new ApplicationSubmitter() {
       @Override
-      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context) {
         ContainerLaunchContext launchContext = context.getLaunchContext();
 
         addRMToken(launchContext);
         appSubmissionContext.setAMContainerSpec(launchContext);
-        appSubmissionContext.setResource(adjustMemory(response, capability));
+        appSubmissionContext.setResource(capability);
         appSubmissionContext.setMaxAppAttempts(2);
 
         try {
@@ -100,7 +108,7 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
       }
     };
 
-    return new ApplicationMasterProcessLauncher(appId, submitter);
+    return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
   }
 
   private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
@@ -139,9 +147,9 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user,
-                                                       TwillSpecification twillSpec,
-                                                       @Nullable String schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+                                                               TwillSpecification twillSpec,
+                                                               @Nullable String schedulerQueue) throws Exception {
     // Ignore user
     return createLauncher(twillSpec, schedulerQueue);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
new file mode 100644
index 0000000..fbeeca0
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.twill.internal.ResourceCapability;
+
+/**
+ * Represents information of the application master.
+ */
+public class ApplicationMasterInfo implements ResourceCapability {
+
+  private final ApplicationId appId;
+  private final int memoryMB;
+  private final int virtualCores;
+
+  public ApplicationMasterInfo(ApplicationId appId, int memoryMB, int virtualCores) {
+    this.appId = appId;
+    this.memoryMB = memoryMB;
+    this.virtualCores = virtualCores;
+  }
+
+  /**
+   * Returns the application ID for the YARN application.
+   */
+  public ApplicationId getAppId() {
+    return appId;
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return memoryMB;
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return virtualCores;
+  }
+
+  @Override
+  public String toString() {
+    return "ApplicationMasterInfo{" +
+      "appId=" + appId +
+      ", memoryMB=" + memoryMB +
+      ", virtualCores=" + virtualCores +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
index 126ff97..da11816 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
@@ -19,38 +19,30 @@ package org.apache.twill.internal.appmaster;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
 import org.apache.twill.internal.yarn.YarnLaunchContext;
-import org.apache.twill.internal.yarn.YarnUtils;
 
 import java.util.Map;
 
 /**
  * A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client.
  */
-public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> {
+public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationMasterInfo> {
 
   private final ApplicationSubmitter submitter;
 
-  public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) {
-    super(appId);
+  public ApplicationMasterProcessLauncher(ApplicationMasterInfo info, ApplicationSubmitter submitter) {
+    super(info);
     this.submitter = submitter;
   }
 
   @Override
   @SuppressWarnings("unchecked")
   protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
-    final ApplicationId appId = getContainerInfo();
-
-    // Set the resource requirement for AM
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(Constants.APP_MASTER_MEMORY_MB);
-    YarnUtils.setVirtualCores(capability, 1);
+    ApplicationMasterInfo appMasterInfo = getContainerInfo();
+    ApplicationId appId = appMasterInfo.getAppId();
 
     // Put in extra environments
     Map<String, String> env = ImmutableMap.<String, String>builder()
@@ -58,11 +50,11 @@ public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessL
       .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId()))
       .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp()))
       .put(EnvKeys.YARN_APP_ID_STR, appId.toString())
-      .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB))
-      .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability)))
+      .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(appMasterInfo.getMemoryMB()))
+      .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(appMasterInfo.getVirtualCores()))
       .build();
 
     launchContext.setEnvironment(env);
-    return (ProcessController<R>) submitter.submit(launchContext, capability);
+    return (ProcessController<R>) submitter.submit(launchContext);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 818db05..355cea3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -176,10 +176,14 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   }
 
   @SuppressWarnings("unchecked")
+  @Nullable
   private EventHandler createEventHandler(TwillSpecification twillSpec) {
     try {
       // Should be able to load by this class ClassLoader, as they packaged in the same jar.
       EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+      if (handlerSpec == null) {
+        return null;
+      }
 
       Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName());
       Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
@@ -221,7 +225,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
 
     // initialize the event handler, if it fails, it will fail the application.
-    eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
+    if (eventHandler != null) {
+      eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
+    }
 
     instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
@@ -237,11 +243,13 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
     LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec));
 
-    try {
-      // call event handler destroy. If there is error, only log and not affected stop sequence.
-      eventHandler.destroy();
-    } catch (Throwable t) {
-      LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t);
+    if (eventHandler != null) {
+      try {
+        // call event handler destroy. If there is error, only log and not affected stop sequence.
+        eventHandler.destroy();
+      } catch (Throwable t) {
+        LOG.warn("Exception when calling {}.destroy()", eventHandler.getClass().getName(), t);
+      }
     }
 
     instanceChangeExecutor.shutdownNow();
@@ -491,7 +499,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       }
     }
 
-    if (!timeoutEvents.isEmpty()) {
+    if (!timeoutEvents.isEmpty() && eventHandler != null) {
       try {
         EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents);
         if (action.getTimeout() < 0) {
@@ -640,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       );
 
       TwillContainerLauncher launcher = new TwillContainerLauncher(
-        twillSpec.getRunnables().get(runnableName), launchContext,
+        twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
         containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
index 38f90ae..e82dbbc 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.twill.internal.appmaster;
 
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
 import org.apache.twill.internal.yarn.YarnLaunchContext;
@@ -27,5 +26,5 @@ import org.apache.twill.internal.yarn.YarnLaunchContext;
  */
 public interface ApplicationSubmitter {
 
-  ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability);
+  ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
index 2023b0e..eb917f3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.twill.api.LocalFile;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.ResourceCapability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,7 @@ import java.util.Map;
  *
  * @param <T> Type of the object that contains information about the container that the process is going to launch.
  */
-public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
+public abstract class AbstractYarnProcessLauncher<T extends ResourceCapability> implements ProcessLauncher<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
index 67a2292..df956bf 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 
 import java.util.List;
 import javax.annotation.Nullable;
@@ -36,8 +37,8 @@ public interface YarnAppClient extends Service {
    * Creates a {@link ProcessLauncher} for launching the application represented by the given spec. If scheduler queue
    * is available and is supported by the YARN cluster, it will be launched in the given queue.
    */
-  ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
-                                                @Nullable String schedulerQueue) throws Exception;
+  ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
+                                                        @Nullable String schedulerQueue) throws Exception;
 
   /**
    * Creates a {@link ProcessLauncher} for launching application with the given user and spec. If scheduler queue
@@ -46,9 +47,9 @@ public interface YarnAppClient extends Service {
    * @deprecated This method will get removed.
    */
   @Deprecated
-  ProcessLauncher<ApplicationId> createLauncher(String user,
-                                                TwillSpecification twillSpec,
-                                                @Nullable String schedulerQueue) throws Exception;
+  ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+                                                        TwillSpecification twillSpec,
+                                                        @Nullable String schedulerQueue) throws Exception;
 
   /**
    * Creates a {@link ProcessController} that can controls an application represented by the given application id.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 4e9f76d..a444dda 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -40,7 +40,6 @@ import com.google.gson.GsonBuilder;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.ClassAcceptor;
 import org.apache.twill.api.EventHandlerSpecification;
@@ -68,6 +67,7 @@ import org.apache.twill.internal.LogOnlyEventHandler;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterMain;
 import org.apache.twill.internal.container.TwillContainerMain;
 import org.apache.twill.internal.json.ArgumentsCodec;
@@ -76,6 +76,7 @@ import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillSpecificationAdapter;
 import org.apache.twill.internal.utils.Dependencies;
 import org.apache.twill.internal.utils.Paths;
+import org.apache.twill.internal.utils.Resources;
 import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
 import org.apache.twill.internal.yarn.YarnUtils;
@@ -283,8 +284,8 @@ final class YarnTwillPreparer implements TwillPreparer {
   @Override
   public TwillController start() {
     try {
-      final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
-      final ApplicationId appId = launcher.getContainerInfo();
+      final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
+      final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
       Callable<ProcessController<YarnApplicationReport>> submitTask =
         new Callable<ProcessController<YarnApplicationReport>>() {
         @Override
@@ -310,7 +311,7 @@ final class YarnTwillPreparer implements TwillPreparer {
                                                      Constants.Files.LAUNCHER_JAR,
                                                      Constants.Files.ARGUMENTS));
 
-          LOG.debug("Submit AM container spec: {}", appId);
+          LOG.debug("Submit AM container spec: {}", appMasterInfo);
           // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
           //     org.apache.twill.internal.TwillLauncher
           //     appMaster.jar
@@ -329,6 +330,9 @@ final class YarnTwillPreparer implements TwillPreparer {
             LOG.debug("Log level is set to {} for the Twill application.", logLevel);
             builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
           }
+
+          int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
+                                                    Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
           return launcher.prepareLaunch(builder.build(), localFiles.values(), credentials)
             .addCommand(
               "$JAVA_HOME/bin/java",
@@ -336,7 +340,7 @@ final class YarnTwillPreparer implements TwillPreparer {
               "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
               "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
               "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
-              "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m",
+              "-Xmx" + memory + "m",
               extraOptions == null ? "" : extraOptions,
               TwillLauncher.class.getName(),
               Constants.Files.APP_MASTER_JAR,

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index 6e27b69..c6f7b9a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -29,6 +29,8 @@ import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.discovery.ServiceDiscovered;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
 import java.util.concurrent.ExecutionException;
@@ -36,11 +38,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * Test for requesting different container size in different order.
- * It specifically test for workaround for YARN-314.
+ * Tests related to different container sizes.
  */
 public class ContainerSizeTestRun extends BaseYarnTest {
 
+  /**
+   * Test for requesting different container size in different order.
+   * It specifically test for workaround for YARN-314.
+   */
   @Test
   public void testContainerSize() throws InterruptedException, TimeoutException, ExecutionException {
     TwillRunner runner = getTwillRunner();
@@ -56,6 +61,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
     }
   }
 
+  @Test
+  public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
+    TwillRunner runner = getTwillRunner();
+    TwillController controller = runner.prepare(new MaxHeapApp())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+
+    try {
+      ServiceDiscovered discovered = controller.discoverService("sleep");
+      Assert.assertTrue(waitForSize(discovered, 1, 120));
+    } finally {
+      controller.terminate().get(120, TimeUnit.SECONDS);
+    }
+  }
 
   /**
    * An application that has two runnables with different memory size.
@@ -86,7 +105,6 @@ public class ContainerSizeTestRun extends BaseYarnTest {
     }
   }
 
-
   /**
    * A runnable that sleep for 120 seconds.
    */
@@ -116,4 +134,63 @@ public class ContainerSizeTestRun extends BaseYarnTest {
       }
     }
   }
+
+  /**
+   * An application for testing max heap size.
+   */
+  public static final class MaxHeapApp implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      // Make the runnable request for container smaller than 128MB (the allocation minimum)
+      ResourceSpecification res = ResourceSpecification.Builder.with()
+        .setVirtualCores(1)
+        .setMemory(16, ResourceSpecification.SizeUnit.MEGA)
+        .build();
+
+      return TwillSpecification.Builder.with()
+        .setName("MaxHeapApp")
+        .withRunnable()
+        .add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles()
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * The runnable for testing max heap size.
+   */
+  public static final class MaxHeapRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MaxHeapRunnable.class);
+    private volatile Thread runThread;
+
+    public MaxHeapRunnable(int port) {
+      super(ImmutableMap.of("port", Integer.toString(port)));
+    }
+
+    @Override
+    public void run() {
+      // This heap size should be > 16, since the min allocation size is 128mb
+      if (Runtime.getRuntime().maxMemory() <= 16 * 1024 * 1024) {
+        LOG.error("Memory size is too small: {}", Runtime.getRuntime().maxMemory());
+        return;
+      }
+
+      runThread = Thread.currentThread();
+      getContext().announce("sleep", Integer.parseInt(getContext().getSpecification().getConfigs().get("port")));
+      try {
+        TimeUnit.SECONDS.sleep(120);
+      } catch (InterruptedException e) {
+        // Ignore.
+      }
+    }
+
+    @Override
+    public void stop() {
+      if (runThread != null) {
+        runThread.interrupt();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 162d4db..a9120c3 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -30,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -188,12 +189,21 @@ public class ZKClientTest {
       client.startAndWait();
 
       try {
-        final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<Watcher.Event.EventType>();
+        final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>();
         client.exists("/expireRewatch", new Watcher() {
           @Override
-          public void process(WatchedEvent event) {
-            client.exists("/expireRewatch", this);
-            events.add(event.getType());
+          public void process(final WatchedEvent event) {
+            Futures.addCallback(client.exists("/expireRewatch", this), new FutureCallback<Stat>() {
+              @Override
+              public void onSuccess(Stat result) {
+                events.add(event.getType());
+              }
+
+              @Override
+              public void onFailure(Throwable t) {
+                LOG.error("Failed to call exists on /expireRewatch", t);
+              }
+            });
           }
         });
 
@@ -309,7 +319,7 @@ public class ZKClientTest {
       Assert.assertEquals("digest", acl.getId().getScheme());
       Assert.assertEquals(digest, acl.getId().getId());
 
-      Assert.assertEquals("test", new String(noAuthClient.getData(path).get().getData()));
+      Assert.assertArrayEquals("test".getBytes(), noAuthClient.getData(path).get().getData());
 
       // When tries to write using the no-auth zk client, it should fail.
       try {



[08/22] incubator-twill git commit: (TWILL-131) Remove ZK node when application finished.

Posted by ch...@apache.org.
(TWILL-131) Remove ZK node when application finished.

- Remove the application ZK node when the application terminates

This closes #70 on Github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e4a36762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e4a36762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e4a36762

Branch: refs/heads/site
Commit: e4a36762e8df34aa4971e29863714f199cb8ddcd
Parents: f6d2b6c
Author: Terence Yim <ch...@apache.org>
Authored: Wed Oct 14 13:39:32 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Oct 20 08:47:20 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/internal/Constants.java    | 83 ++++++++++++++++++
 .../twill/internal/AbstractTwillService.java    |  2 +-
 .../internal/AbstractZKServiceController.java   |  3 +-
 .../org/apache/twill/internal/Constants.java    | 76 -----------------
 .../twill/discovery/ZKDiscoveryService.java     |  4 +-
 .../twill/discovery/ZKDiscoveryServiceTest.java |  2 +-
 .../org/apache/twill/internal/ServiceMain.java  | 19 +++--
 .../appmaster/ApplicationMasterMain.java        | 89 +++++++++++++++++++-
 .../appmaster/ApplicationMasterService.java     |  6 +-
 .../internal/container/TwillContainerMain.java  |  5 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |  8 +-
 .../twill/yarn/YarnTwillRunnerService.java      |  4 +-
 .../org/apache/twill/yarn/BaseYarnTest.java     |  4 +
 .../apache/twill/yarn/EchoServerTestRun.java    | 68 +++++++++++++--
 .../java/org/apache/twill/yarn/TwillTester.java |  4 +
 15 files changed, 266 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..dd04eb1
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+  public static final String LOG_TOPIC = "log";
+
+  /** Maximum number of seconds for AM to start. */
+  public static final int APPLICATION_MAX_START_SECONDS = 60;
+  /** Maximum number of seconds for AM to stop. */
+  public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+  public static final long PROVISION_TIMEOUT = 30000;
+
+  /**
+   * Milliseconds AM should wait for RM to allocate a constrained provision request.
+   * On timeout, AM relaxes the request constraints.
+   */
+  public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
+  public static final double HEAP_MIN_RATIO = 0.7d;
+
+  /** Memory size of AM. */
+  public static final int APP_MASTER_MEMORY_MB = 512;
+
+  public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+  public static final String CLASSPATH = "classpath";
+  public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+  /** Command names for the restart runnable instances. */
+  public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
+  public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
+
+  /**
+   * Common ZK paths constants
+   */
+  public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
+  public static final String INSTANCES_PATH_PREFIX = "/instances";
+
+
+  /**
+   * Constants for names of internal files that are shared between client, AM and containers.
+   */
+  public static final class Files {
+
+    public static final String LAUNCHER_JAR = "launcher.jar";
+    public static final String APP_MASTER_JAR = "appMaster.jar";
+    public static final String CONTAINER_JAR = "container.jar";
+    public static final String LOCALIZE_FILES = "localizeFiles.json";
+    public static final String TWILL_SPEC = "twillSpec.json";
+    public static final String ARGUMENTS = "arguments.json";
+    public static final String ENVIRONMENTS = "environments.json";
+    public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+    public static final String JVM_OPTIONS = "jvm.opts";
+    public static final String CREDENTIALS = "credentials.store";
+
+    private Files() {
+    }
+  }
+
+  private Constants() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 2f95e0e..8688d0b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
   }
 
   private String getLiveNodePath() {
-    return "/instances/" + runId.getId();
+    return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
   }
 
   private <T> byte[] toJson(T obj) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 0cf92ea..9b30823 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.ServiceController;
@@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
    * Returns the zookeeper node path for the ephemeral instance node for this runId.
    */
   protected final String getInstancePath() {
-    return String.format("/instances/%s", getRunId().getId());
+    return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId());
   }
 
   private String getZKPath(String path) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 39de851..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
-  public static final String LOG_TOPIC = "log";
-
-  /** Maximum number of seconds for AM to start. */
-  public static final int APPLICATION_MAX_START_SECONDS = 60;
-  /** Maximum number of seconds for AM to stop. */
-  public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
-  public static final long PROVISION_TIMEOUT = 30000;
-
-  /**
-   * Milliseconds AM should wait for RM to allocate a constrained provision request.
-   * On timeout, AM relaxes the request constraints.
-   */
-  public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
-
-  public static final double HEAP_MIN_RATIO = 0.7d;
-
-  /** Memory size of AM. */
-  public static final int APP_MASTER_MEMORY_MB = 512;
-
-  public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
-  public static final String CLASSPATH = "classpath";
-  public static final String APPLICATION_CLASSPATH = "application-classpath";
-
-  /** Command names for the restart runnable instances. */
-  public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
-  public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
-
-  /**
-   * Constants for names of internal files that are shared between client, AM and containers.
-   */
-  public static final class Files {
-
-    public static final String LAUNCHER_JAR = "launcher.jar";
-    public static final String APP_MASTER_JAR = "appMaster.jar";
-    public static final String CONTAINER_JAR = "container.jar";
-    public static final String LOCALIZE_FILES = "localizeFiles.json";
-    public static final String TWILL_SPEC = "twillSpec.json";
-    public static final String ARGUMENTS = "arguments.json";
-    public static final String ENVIRONMENTS = "environments.json";
-    public static final String LOGBACK_TEMPLATE = "logback-template.xml";
-    public static final String JVM_OPTIONS = "jvm.opts";
-    public static final String CREDENTIALS = "credentials.store";
-
-    private Files() {
-    }
-  }
-
-  private Constants() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 3f0db34..c563bab 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.OperationFuture;
@@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
   private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
-  private static final String NAMESPACE = "/discoverable";
 
   private static final long RETRY_MILLIS = 1000;
 
@@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
    * @param zkClient The {@link ZKClient} for interacting with zookeeper.
    */
   public ZKDiscoveryService(ZKClient zkClient) {
-    this(zkClient, NAMESPACE);
+    this(zkClient, Constants.DISCOVERY_PATH_PREFIX);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index 7707c5b..7d6e369 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -63,7 +63,7 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
     zkServer.stopAndWait();
   }
 
-  @Test (timeout = 10000)
+  @Test (timeout = 30000)
   public void testDoubleRegister() throws Exception {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
     DiscoveryService discoveryService = entry.getKey();

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index a6af3d3..cafd375 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -157,12 +157,16 @@ public abstract class ServiceMain {
   /**
    * Creates a {@link ZKClientService}.
    */
-  protected static ZKClientService createZKClient(String zkConnectStr) {
+  protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
     return ZKClientServices.delegate(
-      ZKClients.reWatchOnExpire(
-        ZKClients.retryOnFailure(
-          ZKClientService.Builder.of(zkConnectStr).build(),
-          RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+      ZKClients.namespace(
+        ZKClients.reWatchOnExpire(
+          ZKClients.retryOnFailure(
+            ZKClientService.Builder.of(zkConnectStr).build(),
+            RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
+          )
+        ), "/" + appName
+      ));
   }
 
   private void configureLogger() {
@@ -256,10 +260,11 @@ public abstract class ServiceMain {
   /**
    * A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
    */
-  protected static final class TwillZKPathService extends AbstractIdleService {
+  protected static class TwillZKPathService extends AbstractIdleService {
+
+    protected static final long TIMEOUT_SECONDS = 5L;
 
     private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
-    private static final long TIMEOUT_SECONDS = 5L;
 
     private final ZKClient zkClient;
     private final String path;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 3e8cb93..38a2463 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
 import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.OperationFuture;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKOperations;
@@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain {
     File twillSpec = new File(Constants.Files.TWILL_SPEC);
     RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
 
-    ZKClientService zkClientService = createZKClient(zkConnect);
+    ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME));
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
     setRMSchedulerAddress(conf);
 
@@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain {
                                                                     twillSpec, amClient, createAppLocation(conf));
     TrackerService trackerService = new TrackerService(service);
 
-    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+    new ApplicationMasterMain(service.getKafkaZKConnect())
       .doMain(
         service,
         new YarnAMClientService(amClient, trackerService),
         zkClientService,
-        new TwillZKPathService(zkClientService, runId),
+        new AppMasterTwillZKPathService(zkClientService, runId),
         new ApplicationKafkaService(zkClientService, runId)
       );
   }
@@ -229,4 +234,82 @@ public final class ApplicationMasterMain extends ServiceMain {
       }
     }
   }
+
+  private static final class AppMasterTwillZKPathService extends TwillZKPathService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
+    private final ZKClient zkClient;
+
+    public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+      super(zkClient, runId);
+      this.zkClient = zkClient;
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      super.shutDown();
+
+      // Deletes ZK nodes created for the application execution.
+      // We don't have to worry about a race condition if another instance of the same app starts at the same time
+      // as when removal is performed. This is because we always create nodes with "createParent == true",
+      // which takes care of the parent node recreation if it is removed from here.
+
+      // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the
+      // same app running, which we can safely ignore and return.
+      if (!delete(Constants.INSTANCES_PATH_PREFIX)) {
+        return;
+      }
+
+      // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
+      // of the same app running that has discovery services running.
+      List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
+                                      .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+      List<OperationFuture<?>> deleteFutures = new ArrayList<>();
+      for (String child : children) {
+        String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
+        LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+        deleteFutures.add(zkClient.delete(path));
+      }
+      Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      for (OperationFuture<?> future : deleteFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (e.getCause() instanceof KeeperException.NotEmptyException) {
+            return;
+          }
+          throw e;
+        }
+      }
+
+      // Delete the /discovery. It may fail with NotEmptyException (due to race between apps),
+      // which can safely ignore and return.
+      if (!delete(Constants.DISCOVERY_PATH_PREFIX)) {
+        return;
+      }
+
+      // Delete the ZK path for the app namespace.
+      delete("/");
+    }
+
+    /**
+     * Deletes the given ZK path.
+     *
+     * @param path path to delete
+     * @return true if the path was deleted, false if failed to delete due to {@link KeeperException.NotEmptyException}.
+     * @throws Exception if failed to delete the path
+     */
+    private boolean delete(String path) throws Exception {
+      try {
+        LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+        zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        return true;
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof KeeperException.NotEmptyException) {
+          return false;
+        }
+        throw e;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index e1523d6..c376de4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
     // Creates ZK path for runnable
     zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
-    runningContainers.addWatcher("/discoverable");
+    runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
     runnableContainerRequests = initContainerRequests();
   }
 
@@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
       env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
       env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
-      env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+      env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
       env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
 
       ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
@@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     return String.format("/%s/runnables/%s", runId.getId(), runnableName);
   }
 
-  private String getKafkaZKConnect() {
+  String getKafkaZKConnect() {
     return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 51837a7..3ea786a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal.container;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractService;
@@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.api.TwillRunnableSpecification;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.discovery.ZKDiscoveryService;
@@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain {
     int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
     int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
 
-    ZKClientService zkClientService = createZKClient(zkConnectStr);
+    ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME));
     ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
 
     ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index d4edfeb..d04cdab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final YarnConfiguration yarnConfig;
   private final TwillSpecification twillSpec;
   private final YarnAppClient yarnAppClient;
-  private final ZKClient zkClient;
+  private final String zkConnectString;
   private final LocationFactory locationFactory;
   private final YarnTwillControllerFactory controllerFactory;
   private final RunId runId;
@@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer {
   private LogEntry.Level logLevel;
 
   YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
-                    YarnAppClient yarnAppClient, ZKClient zkClient,
+                    YarnAppClient yarnAppClient, String zkConnectString,
                     LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel,
                     YarnTwillControllerFactory controllerFactory) {
     this.yarnConfig = yarnConfig;
     this.twillSpec = twillSpec;
     this.yarnAppClient = yarnAppClient;
-    this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+    this.zkConnectString = zkConnectString;
     this.locationFactory = locationFactory;
     this.controllerFactory = controllerFactory;
     this.runId = RunIds.generate();
@@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer {
           ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
             .put(EnvKeys.TWILL_FS_USER, fsUser)
             .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
-            .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+            .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
             .put(EnvKeys.TWILL_RUN_ID, runId.getId())
             .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
             .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 8a13017..c5853d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
     final TwillSpecification twillSpec = application.configure();
     final String appName = twillSpec.getName();
 
-    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
-                                 LogEntry.Level.INFO, new YarnTwillControllerFactory() {
+    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(),
+                                 locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() {
       @Override
       public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
                                         Callable<ProcessController<YarnApplicationReport>> startUp) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index b5c7f58..a9cf2ed 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -109,4 +109,8 @@ public abstract class BaseYarnTest {
   public List<NodeReport> getNodeReports() throws Exception {
     return TWILL_TESTER.getNodeReports();
   }
+
+  public String getZKConnectionString() {
+    return TWILL_TESTER.getZKConnectionString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 3f0f20c..13c07b1 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -31,23 +31,22 @@ import org.apache.twill.api.TwillRunnerService;
 import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.common.Threads;
 import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.zookeeper.ZKClientService;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.net.Socket;
-import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 
 /**
@@ -58,8 +57,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
   private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
 
   @Test
-  public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
-    URISyntaxException, TimeoutException {
+  public void testEchoServer() throws Exception {
     TwillRunner runner = getTwillRunner();
 
     TwillController controller = runner.prepare(new EchoServer(),
@@ -158,6 +156,64 @@ public final class EchoServerTestRun extends BaseYarnTest {
     TimeUnit.SECONDS.sleep(2);
   }
 
+  @Test
+  public void testZKCleanup() throws Exception {
+    ZKClientService zkClient = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build();
+    zkClient.startAndWait();
+
+    try {
+      TwillRunner runner = getTwillRunner();
+
+      // Start an application and stop it.
+      TwillController controller = runner.prepare(new EchoServer())
+        .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+        .withApplicationArguments("echo")
+        .withArguments("EchoServer", "echo2")
+        .start();
+
+      Iterable<Discoverable> echoServices = controller.discoverService("echo");
+      Assert.assertTrue(waitForSize(echoServices, 1, 120));
+
+      controller.terminate().get();
+
+      // Verify the ZK node gets cleanup
+      Assert.assertNull(zkClient.exists("/EchoServer").get());
+
+      // Start two instances of the application and stop one of it
+      List<TwillController> controllers = new ArrayList<>();
+      for (int i = 0; i < 2; i++) {
+        controllers.add(runner.prepare(new EchoServer())
+                          .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+                          .withApplicationArguments("echo")
+                          .withArguments("EchoServer", "echo2")
+                          .start());
+      }
+
+      // There should be two instances up and running.
+      echoServices = controller.discoverService("echo");
+      Assert.assertTrue(waitForSize(echoServices, 2, 120));
+
+      // Stop one instance of the app
+      controllers.get(0).terminate().get();
+
+      // Verify the ZK node should still be there
+      Assert.assertNotNull(zkClient.exists("/EchoServer").get());
+
+      // We should still be able to do discovery, which depends on the ZK node.
+      echoServices = controller.discoverService("echo");
+      Assert.assertTrue(waitForSize(echoServices, 1, 120));
+
+      // Stop second instance of the app
+      controllers.get(1).terminate().get();
+
+      // Verify the ZK node gets cleanup
+      Assert.assertNull(zkClient.exists("/EchoServer").get());
+
+    } finally {
+      zkClient.stopAndWait();
+    }
+  }
+
   /**
    *  Need helper method here to wait for getting resource report because {@link TwillController#getResourceReport()}
    *  could return null if the application has not fully started.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index f669b83..e604cec 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -160,6 +160,10 @@ public class TwillTester extends ExternalResource {
     return yarnAppClient.getNodeReports();
   }
 
+  public String getZKConnectionString() {
+    return zkServer.getConnectionStr();
+  }
+
   private void stopQuietly(Service service) {
     try {
       service.stopAndWait();


[20/22] incubator-twill git commit: Bump version to 0.8.0-incubating-SNAPSHOT

Posted by ch...@apache.org.
Bump version to 0.8.0-incubating-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/59bf3883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/59bf3883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/59bf3883

Branch: refs/heads/site
Commit: 59bf3883f2016fffae1a4b68aa68b28912ca993d
Parents: c6b3f0d
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 11 10:36:57 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jan 11 10:36:57 2016 -0800

----------------------------------------------------------------------
 pom.xml                      | 2 +-
 twill-api/pom.xml            | 2 +-
 twill-common/pom.xml         | 2 +-
 twill-core/pom.xml           | 2 +-
 twill-discovery-api/pom.xml  | 2 +-
 twill-discovery-core/pom.xml | 2 +-
 twill-examples/echo/pom.xml  | 2 +-
 twill-examples/pom.xml       | 2 +-
 twill-examples/yarn/pom.xml  | 2 +-
 twill-ext/pom.xml            | 2 +-
 twill-java8-test/pom.xml     | 2 +-
 twill-yarn/pom.xml           | 2 +-
 twill-zookeeper/pom.xml      | 2 +-
 13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c116322..c4792c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
 
     <groupId>org.apache.twill</groupId>
     <artifactId>twill-parent</artifactId>
-    <version>0.7.0-incubating</version>
+    <version>0.8.0-incubating-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>Apache Twill</name>
     <url>http://twill.incubator.apache.org</url>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-api/pom.xml b/twill-api/pom.xml
index 3cb6d97..ee34b84 100644
--- a/twill-api/pom.xml
+++ b/twill-api/pom.xml
@@ -24,7 +24,7 @@
     <parent>
         <groupId>org.apache.twill</groupId>
         <artifactId>twill-parent</artifactId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>twill-api</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-common/pom.xml
----------------------------------------------------------------------
diff --git a/twill-common/pom.xml b/twill-common/pom.xml
index d1acb55..cd25620 100644
--- a/twill-common/pom.xml
+++ b/twill-common/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index e6b5514..88212ec 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
index a38aff3..a7723d5 100644
--- a/twill-discovery-api/pom.xml
+++ b/twill-discovery-api/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
index f80510d..4264ef1 100644
--- a/twill-discovery-core/pom.xml
+++ b/twill-discovery-core/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-examples/echo/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/echo/pom.xml b/twill-examples/echo/pom.xml
index 474127e..6893236 100644
--- a/twill-examples/echo/pom.xml
+++ b/twill-examples/echo/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
     <parent>
         <artifactId>twill-examples</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
 
     <name>Apache Twill examples: Echo</name>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-examples/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/pom.xml b/twill-examples/pom.xml
index 3e2929b..0550876 100644
--- a/twill-examples/pom.xml
+++ b/twill-examples/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
     <parent>
         <groupId>org.apache.twill</groupId>
         <artifactId>twill-parent</artifactId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>twill-examples</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-examples/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/yarn/pom.xml b/twill-examples/yarn/pom.xml
index dea3777..58d4b99 100644
--- a/twill-examples/yarn/pom.xml
+++ b/twill-examples/yarn/pom.xml
@@ -24,7 +24,7 @@ limitations under the License.
     <parent>
         <artifactId>twill-examples</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
 
     <name>Apache Twill examples: YARN</name>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-ext/pom.xml
----------------------------------------------------------------------
diff --git a/twill-ext/pom.xml b/twill-ext/pom.xml
index 0eb45cd..c5bab6c 100644
--- a/twill-ext/pom.xml
+++ b/twill-ext/pom.xml
@@ -22,7 +22,7 @@ limitations under the License.
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-java8-test/pom.xml
----------------------------------------------------------------------
diff --git a/twill-java8-test/pom.xml b/twill-java8-test/pom.xml
index 65b1c6e..fc8ccae 100644
--- a/twill-java8-test/pom.xml
+++ b/twill-java8-test/pom.xml
@@ -24,7 +24,7 @@
     <parent>
         <groupId>org.apache.twill</groupId>
         <artifactId>twill-parent</artifactId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>twill-java8-test</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index 1df959a..acfacaa 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 9edc990..2f680f7 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>twill-parent</artifactId>
         <groupId>org.apache.twill</groupId>
-        <version>0.7.0-incubating</version>
+        <version>0.8.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 


[05/22] incubator-twill git commit: (TWILL-154) Remove hardcoded check for “hdfs” or “maprfs” URI prefix

Posted by ch...@apache.org.
(TWILL-154) Remove hardcoded check for “hdfs” or “maprfs” URI prefix

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/ef8b1eae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/ef8b1eae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/ef8b1eae

Branch: refs/heads/site
Commit: ef8b1eae804db155dd05b20163e507521694647b
Parents: f88e18f
Author: Terence Yim <ch...@apache.org>
Authored: Fri Oct 9 13:42:22 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 9 15:36:40 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/internal/ServiceMain.java  | 21 +++++++++-----------
 .../apache/twill/yarn/YarnTwillPreparer.java    |  5 +++--
 2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ef8b1eae/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index f7bea24..a6af3d3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -136,20 +136,17 @@ public abstract class ServiceMain {
         return new LocalLocationFactory().create(appDir);
       }
 
-      if ("hdfs".equals(appDir.getScheme()) || "maprfs".equals(appDir.getScheme())) {
-        if (UserGroupInformation.isSecurityEnabled()) {
-          return new HDFSLocationFactory(FileSystem.get(appDir, conf)).create(appDir);
-        }
-
-        String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
-        if (fsUser == null) {
-          throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
-        }
-        return new HDFSLocationFactory(FileSystem.get(appDir, conf, fsUser)).create(appDir);
+      // If not file, assuming it is a FileSystem, hence construct with HDFSLocationFactory which wraps
+      // a FileSystem created from the Configuration
+      if (UserGroupInformation.isSecurityEnabled()) {
+        return new HDFSLocationFactory(FileSystem.get(appDir, conf)).create(appDir);
       }
 
-      LOG.warn("Unsupported location type {}.", appDir);
-      throw new IllegalArgumentException("Unsupported location type " + appDir);
+      String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
+      if (fsUser == null) {
+        throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
+      }
+      return new HDFSLocationFactory(FileSystem.get(appDir, conf, fsUser)).create(appDir);
 
     } catch (Exception e) {
       LOG.error("Failed to create application location for {}.", appDir);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ef8b1eae/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index a444dda..ea77116 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -52,6 +52,7 @@ import org.apache.twill.api.TwillPreparer;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.filesystem.HDFSLocationFactory;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.filesystem.LocationFactory;
 import org.apache.twill.internal.ApplicationBundler;
@@ -451,8 +452,8 @@ final class YarnTwillPreparer implements TwillPreparer {
         Location location;
 
         URI uri = localFile.getURI();
-        if ("hdfs".equals(uri.getScheme())) {
-          // Assuming the location factory is HDFS one. If it is not, it will failed, which is the correct behavior.
+        if (locationFactory.getHomeLocation().toURI().getScheme().equals(uri.getScheme())) {
+          // If the source file location is having the same scheme as the target location, no need to copy
           location = locationFactory.create(uri);
         } else {
           URL url = uri.toURL();


[15/22] incubator-twill git commit: (TWILL-161) Added back off logic to SimplyKafkaConsumer

Posted by ch...@apache.org.
(TWILL-161) Added back off logic to SimplyKafkaConsumer

- Avoid excessive amount of polling and logs in case of failure

This closes #76 on GitHub.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b740da43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b740da43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b740da43

Branch: refs/heads/site
Commit: b740da4385ec542268508882e10069832f5ec35c
Parents: 388a6d9
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 5 14:04:29 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 5 15:40:33 2016 -0800

----------------------------------------------------------------------
 .../kafka/client/SimpleKafkaConsumer.java       | 48 +++++++++++---
 .../apache/twill/kafka/client/KafkaTest.java    | 68 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b740da43/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 230521c..8cfe889 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -72,7 +72,8 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
   private static final int SO_TIMEOUT = 5 * 1000;           // 5 seconds.
   private static final int MAX_WAIT = 1000;                 // 1 second.
   private static final long CONSUMER_EXPIRE_MINUTES = 1L;   // close consumer if not used for 1 minute.
-  private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000L; // Sleep for 2 seconds if failure in consumer.
+  private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer.
+  private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer.
   private static final long EMPTY_FETCH_WAIT = 500L;        // Sleep for 500 ms if no message is fetched.
 
   private final BrokerService brokerService;
@@ -328,16 +329,12 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
       final AtomicLong offset = new AtomicLong(startOffset);
 
       Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
-
+      ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
+                                                          MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
       while (running) {
         if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
           LOG.debug("No leader for topic partition {}.", topicPart);
-          try {
-            TimeUnit.MILLISECONDS.sleep(CONSUMER_FAILURE_RETRY_INTERVAL);
-          } catch (InterruptedException e) {
-            // OK to ignore this, as interrupt would be caused by thread termination.
-            LOG.trace("Consumer sleep interrupted.", e);
-          }
+          backoff.backoff();
           continue;
         }
 
@@ -375,10 +372,12 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
 
           // Call the callback
           invokeCallback(messages, offset);
+          backoff.reset();
         } catch (Throwable t) {
           if (running || !(t instanceof ClosedByInterruptException)) {
             // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
             LOG.info("Exception when fetching message on {}.", topicPart, t);
+            backoff.backoff();
           }
           consumers.refresh(consumerEntry.getKey());
           consumerEntry = null;
@@ -477,5 +476,38 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
         }
       };
     }
+
+    /**
+     * Helper class for performance exponential backoff on message fetching failure.
+     */
+    private final class ExponentialBackoff {
+      private final long initialBackoff;
+      private final long maxBackoff;
+      private final TimeUnit backoffUnit;
+      private int failureCount = 0;
+
+      private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) {
+        this.initialBackoff = initialBackoff;
+        this.maxBackoff = maxBackoff;
+        this.backoffUnit = backoffUnit;
+      }
+
+      void backoff() {
+        failureCount++;
+        long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
+        long backoff = Math.min(initialBackoff * multiplier, maxBackoff);
+        backoff = backoff < 0 ? maxBackoff : backoff;
+        try {
+          backoffUnit.sleep(backoff);
+        } catch (InterruptedException e) {
+          // OK to ignore since this method is called from the consumer thread only, which on thread shutdown,
+          // the thread will be interrupted
+        }
+      }
+
+      void reset() {
+        failureCount = 0;
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b740da43/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 93119ab..4ac8ae4 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -42,8 +42,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -84,6 +87,71 @@ public class KafkaTest {
   }
 
   @Test
+  public void testKafkaClientReconnect() throws Exception {
+    String topic = "backoff";
+    Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
+    EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig);
+
+    ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
+    zkClient.startAndWait();
+    try {
+      zkClient.create("/", null, CreateMode.PERSISTENT).get();
+
+      ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
+      kafkaClient.startAndWait();
+
+      try {
+        server.startAndWait();
+        try {
+          // Publish a messages
+          createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
+
+          // Creater a consumer
+          final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+          Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
+            .consume(new KafkaConsumer.MessageCallback() {
+              @Override
+              public void onReceived(Iterator<FetchedMessage> messages) {
+                while (messages.hasNext()) {
+                  queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+                }
+              }
+
+              @Override
+              public void finished() {
+              }
+            });
+
+          // Wait for the first message
+          Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS));
+
+          // Shutdown the server
+          server.stopAndWait();
+
+          // Start the server again.
+          // Needs to create a new instance with the same config since guava service cannot be restarted
+          server = new EmbeddedKafkaServer(kafkServerConfig);
+          server.startAndWait();
+
+          // Publish another message
+          createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
+
+          // Should be able to get the second message
+          Assert.assertEquals("0 Second message", queue.poll(60, TimeUnit.SECONDS));
+
+          cancel.cancel();
+        } finally {
+          kafkaClient.stopAndWait();
+        }
+      } finally {
+        server.stopAndWait();
+      }
+    } finally {
+      zkClient.stopAndWait();
+    }
+  }
+
+  @Test
   public void testKafkaClient() throws Exception {
     String topic = "testClient";
 


[21/22] incubator-twill git commit: Update license statement year to 2016

Posted by ch...@apache.org.
Update license statement year to 2016

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/180e446d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/180e446d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/180e446d

Branch: refs/heads/site
Commit: 180e446d23709706bf189fed66529bc2be6c3884
Parents: 59bf388
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 26 11:08:53 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 26 11:08:53 2016 -0800

----------------------------------------------------------------------
 NOTICE  | 2 +-
 pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/180e446d/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 17c9ea9..a88b8d4 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache Twill
-Copyright 2013-2015 The Apache Software Foundation
+Copyright 2013-2016 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/180e446d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c4792c4..839214c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,7 +234,7 @@
                             <link>http://docs.oracle.com/javase/7/docs/api/</link>
                         </links>
                         <bottom>
-                            <![CDATA[Copyright &#169; 2013-2015 <a href="http://www.apache.org">The Apache Software Foundation</a>. All rights reserved.]]>
+                            <![CDATA[Copyright &#169; 2013-2016 <a href="http://www.apache.org">The Apache Software Foundation</a>. All rights reserved.]]>
                         </bottom>
                     </configuration>
                     <executions>


[17/22] incubator-twill git commit: Fix an easy to fail unit-test

Posted by ch...@apache.org.
Fix an easy to fail unit-test

 - The test has race condition. The mutual discovery
   doesn’t works well as one runnable can be finished
   and not discoverable anymore before another one
   tries to discover.
 - Switch to use an Echo server / client runnable,
   which only the client needs to discover the server.

This closes #77 on GitHub.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/900e3829
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/900e3829
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/900e3829

Branch: refs/heads/site
Commit: 900e382938090fb4585389cfa7d0acf020f088e8
Parents: 323e5af
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 5 23:46:06 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Jan 6 22:16:31 2016 -0800

----------------------------------------------------------------------
 .../twill/yarn/ServiceDiscoveryTestRun.java     | 91 +++++++++++---------
 .../org/apache/twill/yarn/SocketServer.java     | 13 +--
 2 files changed, 56 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/900e3829/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
index 59fe835..77f53ad 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
@@ -17,8 +17,10 @@
  */
 package org.apache.twill.yarn;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.io.LineReader;
 import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.Command;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillContext;
 import org.apache.twill.api.TwillController;
@@ -34,9 +36,17 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -51,18 +61,16 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest {
     TwillController controller = twillRunner
       .prepare(new ServiceApplication())
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-      .withArguments("r1", "12345")
-      .withArguments("r2", "45678")
+      .withApplicationArguments("echo")
       .start();
 
-    ServiceDiscovered completed = controller.discoverService("completed");
-    Assert.assertTrue(waitForSize(completed, 2, 120));
-    controller.sendCommand(Command.Builder.of("done").build());
-    controller.awaitTerminated(120, TimeUnit.SECONDS);
+    ServiceDiscovered discovered = controller.discoverService("discovered");
+    Assert.assertTrue(waitForSize(discovered, 1, 120));
+    controller.terminate().get();
   }
 
   /**
-   * An application that contains two {@link ServiceRunnable}.
+   * An application that contains an EchoServer and an EchoClient.
    */
   public static final class ServiceApplication implements TwillApplication {
 
@@ -71,66 +79,63 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest {
       return TwillSpecification.Builder.with()
         .setName("ServiceApp")
         .withRunnable()
-          .add("r1", new ServiceRunnable()).noLocalFiles()
-          .add("r2", new ServiceRunnable()).noLocalFiles()
+          .add("server", new EchoServer()).noLocalFiles()
+          .add("client", new EchoClient()).noLocalFiles()
         .anyOrder()
         .build();
     }
   }
 
   /**
-   * A Runnable that will announce on service and wait for announcement from another instance in the same service.
+   * A runnable to discover the echo server and issue a call to it.
    */
-  public static final class ServiceRunnable extends AbstractTwillRunnable {
+  public static final class EchoClient extends AbstractTwillRunnable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class);
-    private static final String SERVICE_NAME = "service";
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
+    private static final Logger LOG = LoggerFactory.getLogger(EchoClient.class);
+
+    private final CountDownLatch completion = new CountDownLatch(1);
 
     @Override
     public void run() {
-      final int port = Integer.parseInt(getContext().getArguments()[0]);
-      Cancellable cancelService = getContext().announce(SERVICE_NAME, port);
-
-      final CountDownLatch discoveredLatch = new CountDownLatch(1);
-
-      ServiceDiscovered serviceDiscovered = getContext().discover(SERVICE_NAME);
+      final BlockingQueue<Discoverable> discoverables = new LinkedBlockingQueue<>();
+      ServiceDiscovered serviceDiscovered = getContext().discover(getContext().getApplicationArguments()[0]);
       serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
         @Override
         public void onChange(ServiceDiscovered serviceDiscovered) {
-          // Try to find a discoverable that is not this instance
-          for (Discoverable discoverable : serviceDiscovered) {
-            int discoveredPort = discoverable.getSocketAddress().getPort();
-            if (SERVICE_NAME.equals(discoverable.getName()) && discoveredPort != port) {
-              LOG.info("{}: Service discovered at {}", getContext().getSpecification().getName(), discoveredPort);
-              discoveredLatch.countDown();
-            }
-          }
+          Iterables.addAll(discoverables, serviceDiscovered);
         }
       }, Threads.SAME_THREAD_EXECUTOR);
 
       try {
-        discoveredLatch.await();
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted.", e);
-      }
+        Discoverable discoverable = discoverables.poll(120, TimeUnit.SECONDS);
+        // Make a call to the echo server
+        InetSocketAddress address = discoverable.getSocketAddress();
+        Socket socket = new Socket(address.getAddress(), address.getPort());
+        String message = "Hello World";
+        try (PrintWriter printer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"))) {
+          printer.println(message);
+          printer.flush();
 
-      // Announce the "complete" service so that the driver knows this runnable has discovered the other
-      Cancellable cancelCompleted = getContext().announce("completed", port);
-      try {
-        stopLatch.await();
-        cancelService.cancel();
-        cancelCompleted.cancel();
+          try (Reader reader = new InputStreamReader(socket.getInputStream(), "UTF-8")) {
+            LineReader lineReader = new LineReader(reader);
+            String line = lineReader.readLine();
+            Preconditions.checkState(message.equals(line), "Expected %s, got %s", message, line);
+          }
+        }
+
+        Cancellable cancellable = getContext().announce("discovered", 12345);
+        completion.await();
+        cancellable.cancel();
       } catch (InterruptedException e) {
         LOG.warn("Interrupted.", e);
+      } catch (IOException e) {
+        LOG.error("Failed to talk to server", e);
       }
     }
 
     @Override
-    public void handleCommand(Command command) throws Exception {
-      if ("done".equals(command.getCommand())) {
-        stopLatch.countDown();
-      }
+    public void stop() {
+      completion.countDown();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/900e3829/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
index fd38576..e9e6a99 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -19,7 +19,6 @@ package org.apache.twill.yarn;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import org.apache.twill.api.AbstractTwillRunnable;
 import org.apache.twill.api.TwillContext;
 import org.apache.twill.common.Cancellable;
@@ -34,6 +33,7 @@ import java.io.PrintWriter;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -57,10 +57,13 @@ public abstract class SocketServer extends AbstractTwillRunnable {
                ", id: " + context.getInstanceId() +
                ", count: " + context.getInstanceCount());
 
-      final List<Cancellable> cancellables = ImmutableList.of(
-        context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
-        context.announce(context.getArguments()[0], serverSocket.getLocalPort())
-      );
+      // Announce with service names as specified in app arguments and runnable arguments
+      final List<Cancellable> cancellables = new ArrayList<>();
+      for (String[] args : new String[][] {context.getApplicationArguments(), context.getArguments()}) {
+        if (args.length > 0) {
+          cancellables.add(context.announce(args[0], serverSocket.getLocalPort()));
+        }
+      }
       canceller = new Cancellable() {
         @Override
         public void cancel() {


[12/22] incubator-twill git commit: Simple JavaDoc fix for a typo, from denoteed to denoted.

Posted by ch...@apache.org.
Simple JavaDoc fix for a typo, from denoteed to denoted.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/65c730b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/65c730b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/65c730b8

Branch: refs/heads/site
Commit: 65c730b82528da669c6abaae96284e18defd8402
Parents: 359b12b
Author: Henry Saputra <hs...@apache.org>
Authored: Sat Dec 26 13:48:39 2015 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Sun Dec 27 10:53:05 2015 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/twill/filesystem/HDFSLocation.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/65c730b8/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index 2ab97db..818fe23 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -121,7 +121,7 @@ final class HDFSLocation implements Location {
   }
 
   /**
-   * @return Returns the name of the file or directory denoteed by this abstract pathname.
+   * @return Returns the name of the file or directory denoted by this abstract pathname.
    */
   @Override
   public String getName() {