You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2018/03/22 05:20:05 UTC

[01/15] twill git commit: (TWILL-248) Upgrade to use Netty-4.1

Repository: twill
Updated Branches:
  refs/heads/site 49f35954d -> 2a97588cc


(TWILL-248) Upgrade to use Netty-4.1

- Also enable ResourceReportClient to use HTTP compression

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d6b3882
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d6b3882
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d6b3882

Branch: refs/heads/site
Commit: 6d6b3882a631e2260865f641674ee362c52fd1a0
Parents: f34a39a
Author: Terence Yim <ch...@apache.org>
Authored: Tue Oct 10 13:26:11 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Oct 16 09:10:18 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  14 +-
 twill-core/pom.xml                              |  10 +-
 .../internal/appmaster/TrackerService.java      | 217 ++++++++++---------
 .../apache/twill/yarn/ResourceReportClient.java |  36 ++-
 4 files changed, 159 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6aa474..573936d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@
         <guava.version>13.0.1</guava.version>
         <gson.version>2.2.4</gson.version>
         <findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
-        <netty.version>3.6.6.Final</netty.version>
+        <netty.version>4.1.16.Final</netty.version>
         <snappy-java.version>1.0.5</snappy-java.version>
         <jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version>
         <asm.version>5.0.2</asm.version>
@@ -777,7 +777,17 @@
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
-                <artifactId>netty</artifactId>
+                <artifactId>netty-buffer</artifactId>
+                <version>${netty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-codec-http</artifactId>
+                <version>${netty.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-handler</artifactId>
                 <version>${netty.version}</version>
             </dependency>
             <dependency>

http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index f265f26..4bee172 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -55,7 +55,15 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
         </dependency>
         <dependency>
             <groupId>org.xerial.snappy</groupId>

http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index f91efcc..bb8cf57 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -20,38 +20,36 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.internal.json.ResourceReportAdapter;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.util.CharsetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,8 +60,6 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -78,14 +74,15 @@ public final class TrackerService extends AbstractIdleService {
 
   private static final Logger LOG  = LoggerFactory.getLogger(TrackerService.class);
   private static final int NUM_BOSS_THREADS = 1;
+  private static final int NUM_WORKER_THREADS = 10;
   private static final int CLOSE_CHANNEL_TIMEOUT = 5;
   private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
 
   private final Supplier<ResourceReport> resourceReport;
-  private final ChannelGroup channelGroup;
 
   private String host;
   private ServerBootstrap bootstrap;
+  private Channel serverChannel;
   private InetSocketAddress bindAddress;
   private URL url;
 
@@ -95,7 +92,6 @@ public final class TrackerService extends AbstractIdleService {
    * @param resourceReport live report that the service will return to clients.
    */
   TrackerService(Supplier<ResourceReport> resourceReport) {
-    this.channelGroup = new DefaultChannelGroup("appMasterTracker");
     this.resourceReport = resourceReport;
   }
 
@@ -123,52 +119,40 @@ public final class TrackerService extends AbstractIdleService {
 
   @Override
   protected void startUp() throws Exception {
-    Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
-                                                        new ThreadFactoryBuilder()
-                                                          .setDaemon(true)
-                                                          .setNameFormat("boss-thread")
-                                                          .build());
-
-    Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                                                             .setDaemon(true)
-                                                             .setNameFormat("worker-thread#%d")
-                                                             .build());
-
-    ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
-
-    bootstrap = new ServerBootstrap(factory);
-
-    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      public ChannelPipeline getPipeline() {
-        ChannelPipeline pipeline = Channels.pipeline();
-
-        pipeline.addLast("decoder", new HttpRequestDecoder());
-        pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
-        pipeline.addLast("encoder", new HttpResponseEncoder());
-        pipeline.addLast("compressor", new HttpContentCompressor());
-        pipeline.addLast("handler", new ReportHandler());
-
-        return pipeline;
-      }
-    });
-
-    Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
-    bindAddress = (InetSocketAddress) channel.getLocalAddress();
+    EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
+                                                     new ThreadFactoryBuilder()
+                                                       .setDaemon(true).setNameFormat("boss-thread").build());
+    EventLoopGroup workerGroup = new NioEventLoopGroup(NUM_WORKER_THREADS,
+                                                       new ThreadFactoryBuilder()
+                                                         .setDaemon(true).setNameFormat("worker-thread#%d").build());
+
+    bootstrap = new ServerBootstrap()
+      .group(bossGroup, workerGroup)
+      .channel(NioServerSocketChannel.class)
+      .childHandler(new ChannelInitializer<SocketChannel>() {
+        @Override
+        protected void initChannel(SocketChannel ch) throws Exception {
+          ChannelPipeline pipeline = ch.pipeline();
+          pipeline.addLast("codec", new HttpServerCodec());
+          pipeline.addLast("compressor", new HttpContentCompressor());
+          pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_INPUT_SIZE));
+          pipeline.addLast("handler", new ReportHandler());
+        }
+      });
+
+    serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+    bindAddress = (InetSocketAddress) serverChannel.localAddress();
     url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
-    channelGroup.add(channel);
 
     LOG.info("Tracker service started at {}", url);
   }
 
   @Override
   protected void shutDown() throws Exception {
-    try {
-      if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
-        LOG.warn("Timeout when closing all channels.");
-      }
-    } finally {
-      bootstrap.releaseExternalResources();
-    }
+    serverChannel.close().await();
+    bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+    bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+
     LOG.info("Tracker service stopped at {}", url);
   }
 
@@ -176,7 +160,7 @@ public final class TrackerService extends AbstractIdleService {
    * Handler to return resources used by this application master, which will be available through
    * the host and port set when this application master registered itself to the resource manager.
    */
-  final class ReportHandler extends SimpleChannelUpstreamHandler {
+  final class ReportHandler extends ChannelInboundHandlerAdapter {
     private final ResourceReportAdapter reportAdapter;
 
     ReportHandler() {
@@ -184,51 +168,68 @@ public final class TrackerService extends AbstractIdleService {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != HttpMethod.GET) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
-        response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
-        response.setContent(ChannelBuffers.wrappedBuffer("Only GET is supported".getBytes(StandardCharsets.UTF_8)));
-        writeResponse(e.getChannel(), response);
-        return;
-      }
-
-      if (!PATH.equals(request.getUri())) {
-        // Redirect all GET call to the /resources path.
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
-        response.setHeader(HttpHeaders.Names.LOCATION, PATH);
-        writeResponse(e.getChannel(), response);
-        return;
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+      try {
+        if (!(msg instanceof HttpRequest)) {
+          // Ignore if it is not HttpRequest
+          return;
+        }
+
+        HttpRequest request = (HttpRequest) msg;
+        if (!HttpMethod.GET.equals(request.method())) {
+          FullHttpResponse response = new DefaultFullHttpResponse(
+            HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED,
+            Unpooled.copiedBuffer("Only GET is supported", StandardCharsets.UTF_8));
+
+          HttpUtil.setContentLength(response, response.content().readableBytes());
+          response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+          writeAndClose(ctx.channel(), response);
+          return;
+        }
+
+        if (!PATH.equals(request.uri())) {
+          // Redirect all GET call to the /resources path.
+          HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+                                                              HttpResponseStatus.TEMPORARY_REDIRECT);
+          HttpUtil.setContentLength(response, 0);
+          response.headers().set(HttpHeaderNames.LOCATION, PATH);
+          writeAndClose(ctx.channel(), response);
+          return;
+        }
+
+        writeResourceReport(ctx.channel());
+      } finally {
+        ReferenceCountUtil.release(msg);
       }
+    }
 
-      writeResourceReport(e.getChannel());
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      ctx.channel().close();
     }
 
     private void writeResourceReport(Channel channel) {
-      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-      response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
-
-      ChannelBuffer content = ChannelBuffers.dynamicBuffer();
-      Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
-      reportAdapter.toJson(resourceReport.get(), writer);
+      ByteBuf content = Unpooled.buffer();
+      Writer writer = new OutputStreamWriter(new ByteBufOutputStream(content), CharsetUtil.UTF_8);
       try {
+        reportAdapter.toJson(resourceReport.get(), writer);
         writer.close();
-      } catch (IOException e1) {
-        LOG.error("error writing resource report", e1);
+      } catch (IOException e) {
+        LOG.error("error writing resource report", e);
+        writeAndClose(channel, new DefaultFullHttpResponse(
+          HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR,
+          Unpooled.copiedBuffer(e.getMessage(), StandardCharsets.UTF_8)));
+        return;
       }
-      response.setContent(content);
-      writeResponse(channel, response);
-    }
 
-    private void writeResponse(Channel channel, HttpResponse response) {
-      ChannelFuture future = channel.write(response);
-      future.addListener(ChannelFutureListener.CLOSE);
+      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
+      HttpUtil.setContentLength(response, content.readableBytes());
+      response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
+      channel.writeAndFlush(response);
     }
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      e.getChannel().close();
+    private void writeAndClose(Channel channel, HttpResponse response) {
+      channel.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
index fb8b7e8..4676751 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
@@ -17,19 +17,21 @@
  */
 package org.apache.twill.yarn;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Closeables;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.internal.json.ResourceReportAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.zip.DeflaterInputStream;
+import java.util.zip.GZIPInputStream;
 
 /**
  * Package private class to get {@link ResourceReport} from the application master.
@@ -52,12 +54,16 @@ final class ResourceReportClient {
   public ResourceReport get() {
     for (URL url : resourceUrls) {
       try {
-        Reader reader = new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8));
-        try {
+        HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
+        urlConn.setRequestProperty("Accept-Encoding", "gzip, deflate");
+
+        if (urlConn.getResponseCode() != 200) {
+          continue;
+        }
+
+        try (Reader reader = new InputStreamReader(getInputStream(urlConn), StandardCharsets.UTF_8)) {
           LOG.trace("Report returned by {}", url);
           return reportAdapter.fromJson(reader);
-        } finally {
-          Closeables.closeQuietly(reader);
         }
       } catch (IOException e) {
         // Just log a trace as it's ok to not able to fetch resource report
@@ -66,4 +72,20 @@ final class ResourceReportClient {
     }
     return null;
   }
+
+  private InputStream getInputStream(HttpURLConnection urlConn) throws IOException {
+    InputStream is = urlConn.getInputStream();
+    String contentEncoding = urlConn.getContentEncoding();
+    if (contentEncoding == null) {
+      return is;
+    }
+    if ("gzip".equalsIgnoreCase(contentEncoding)) {
+      return new GZIPInputStream(is);
+    }
+    if ("deflate".equalsIgnoreCase(contentEncoding)) {
+      return new DeflaterInputStream(is);
+    }
+    // This should never happen
+    throw new IOException("Unsupported content encoding " + contentEncoding);
+  }
 }


[04/15] twill git commit: (TWILL-251) Reduce log level of YarnNMClient

Posted by ch...@apache.org.
(TWILL-251) Reduce log level of YarnNMClient

- Also reduce the polling frequency

This closes #64 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/55f6d6fc
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/55f6d6fc
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/55f6d6fc

Branch: refs/heads/site
Commit: 55f6d6fc9b088e1fa9a1462a92146d608bbebb22
Parents: b7785bd
Author: Terence Yim <ch...@apache.org>
Authored: Fri Dec 1 13:21:07 2017 -0600
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Dec 4 10:21:05 2017 -0800

----------------------------------------------------------------------
 .../twill/internal/yarn/Hadoop20YarnNMClient.java     | 12 ++++++++----
 .../twill/internal/yarn/Hadoop21YarnNMClient.java     | 14 +++++++++-----
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/55f6d6fc/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
index b43e4e1..e8628da 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
@@ -18,6 +18,7 @@
 package org.apache.twill.internal.yarn;
 
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -102,14 +104,16 @@ public final class Hadoop20YarnNMClient implements YarnNMClient {
       stopRequest.setContainerId(container.getId());
       try {
         manager.stopContainer(stopRequest);
-        boolean completed = false;
-        while (!completed) {
+        while (true) {
           GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
           statusRequest.setContainerId(container.getId());
           GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
-          LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
+          LOG.trace("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
 
-          completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
+          if (statusResponse.getStatus().getState() == ContainerState.COMPLETE) {
+            break;
+          }
+          Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
         }
         LOG.info("Container {} stopped.", container.getId());
       } catch (YarnRemoteException e) {

http://git-wip-us.apache.org/repos/asf/twill/blob/55f6d6fc/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
index 8c5f0fc..dcdeb70 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.yarn;
 
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -29,6 +30,8 @@ import org.apache.twill.common.Cancellable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Wrapper class for {@link NMClient} for Hadoop version 2.1 or greater.
  */
@@ -82,12 +85,13 @@ public final class Hadoop21YarnNMClient extends AbstractIdleService implements Y
 
       try {
         nmClient.stopContainer(container.getId(), container.getNodeId());
-        boolean completed = false;
-        while (!completed) {
+        while (true) {
           ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
-          LOG.info("Container status: {} {}", status, status.getDiagnostics());
-
-          completed = (status.getState() == ContainerState.COMPLETE);
+          LOG.trace("Container status: {} {}", status, status.getDiagnostics());
+          if (status.getState() == ContainerState.COMPLETE) {
+            break;
+          }
+          Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
         }
         LOG.info("Container {} stopped.", container.getId());
       } catch (Exception e) {


[07/15] twill git commit: - Added hadoop-2.6 profile to travis - Also runs the Java8 tests with hadoop-2.6

Posted by ch...@apache.org.
- Added hadoop-2.6 profile to travis
- Also runs the Java8 tests with hadoop-2.6

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/af60a021
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/af60a021
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/af60a021

Branch: refs/heads/site
Commit: af60a0215c43d1e8a0853b2ff4b65ed278f06290
Parents: 00a844a
Author: Terence Yim <ch...@apache.org>
Authored: Fri Mar 9 12:31:12 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Mar 9 12:31:12 2018 -0800

----------------------------------------------------------------------
 .travis.yml | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/af60a021/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 93cf462..fae270d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,9 +41,10 @@ env:
   - PROFILE='hadoop-2.2'
   - PROFILE='hadoop-2.4'
   - PROFILE='hadoop-2.5'
+  - PROFILE='hadoop-2.6'
   - PROFILE='cdh-4.4.0'
   - PROFILE='mapr-hadoop-2.4'
-  - PROFILE='hadoop-2.5,java8-test'
+  - PROFILE='hadoop-2.6,java8-test'
 
 # Only runs JDK8 on hadoop-2.5 profile
 matrix:
@@ -59,11 +60,13 @@ matrix:
     - jdk: oraclejdk8
       env: PROFILE='hadoop-2.5'
     - jdk: oraclejdk8
+      env: PROFILE='hadoop-2.6'
+    - jdk: oraclejdk8
       env: PROFILE='cdh-4.4.0'
     - jdk: oraclejdk8
       env: PROFILE='mapr-hadoop-2.4'
     - jdk: oraclejdk7
-      env: PROFILE='hadoop-2.5,java8-test'
+      env: PROFILE='hadoop-2.6,java8-test'
 
 sudo: false
 


[05/15] twill git commit: (TWILL-254) Update to use ContainerId.fromString

Posted by ch...@apache.org.
(TWILL-254) Update to use ContainerId.fromString

This closes #65 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/d6095d48
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/d6095d48
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/d6095d48

Branch: refs/heads/site
Commit: d6095d4876d55c8e614a11bf66d780e0856481f8
Parents: 55f6d6f
Author: Clay Baenziger <cb...@bloomberg.net>
Authored: Fri Feb 2 22:35:37 2018 -0500
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Feb 5 16:59:50 2018 -0800

----------------------------------------------------------------------
 pom.xml                                         | 51 ++++++++++++++++++--
 .../internal/yarn/Hadoop20YarnAMClient.java     |  7 +++
 .../internal/yarn/Hadoop21YarnAMClient.java     |  7 +++
 .../internal/yarn/Hadoop22YarnAMClient.java     |  2 +-
 .../internal/yarn/Hadoop26YarnAMClient.java     | 42 ++++++++++++++++
 .../internal/yarn/AbstractYarnAMClient.java     | 12 +++--
 .../yarn/VersionDetectYarnAMClientFactory.java  | 10 +++-
 .../apache/twill/internal/yarn/YarnUtils.java   | 13 ++++-
 8 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 573936d..f804fd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,7 +171,6 @@
         <junit.version>4.11</junit.version>
         <jopt-simple.version>3.2</jopt-simple.version>
         <commons-compress.version>1.5</commons-compress.version>
-        <hadoop.version>[2.0.2-alpha,2.3.0]</hadoop.version>
         <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
     </properties>
 
@@ -538,9 +537,6 @@
             <properties>
                 <hadoop.version>2.3.0</hadoop.version>
             </properties>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
             <build>
                 <plugins>
                     <plugin>
@@ -723,6 +719,53 @@
             </build>
         </profile>
         <profile>
+            <id>hadoop-2.6</id>
+            <properties>
+                <hadoop.version>2.6.5</hadoop.version>
+            </properties>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.8</version>
+                        <executions>
+                            <execution>
+                                <id>add-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/main/hadoop21</source>
+                                        <source>src/main/hadoop22</source>
+                                        <source>src/main/hadoop23</source>
+                                        <source>src/main/hadoop26</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>add-source-2.0</id>
+                                <phase>prepare-package</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/main/hadoop20</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
             <id>java8-test</id>
             <modules>
                 <module>twill-java8-test</module>

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index a990cc0..67bef3e 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -26,11 +26,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
 import org.apache.twill.internal.yarn.ports.AMRMClient;
 import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
@@ -72,6 +74,11 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
   }
 
   @Override
+  protected ContainerId containerIdLookup(String containerIdStr) {
+    return (ConverterUtils.toContainerId(containerIdStr));
+  }
+
+  @Override
   protected void startUp() throws Exception {
     Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
     Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 82f428a..f349b4e 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -27,11 +27,13 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +59,11 @@ public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.Contai
     };
   }
 
+  @Override
+  protected ContainerId containerIdLookup(String containerIdStr) {
+    return (ConverterUtils.toContainerId(containerIdStr));
+  }
+
   protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
   protected final Hadoop21YarnNMClient nmClient;
   protected Resource maxCapability;

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
index 3ee99e8..c1bb5fd 100644
--- a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
@@ -26,7 +26,7 @@ import java.util.List;
 /**
  * Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
  */
-public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
+public class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);
 

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java
new file mode 100644
index 0000000..372c422
--- /dev/null
+++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Wrapper class for AMRMClient for Hadoop version 2.6 or greater.
+ */
+public final class Hadoop26YarnAMClient extends Hadoop22YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop26YarnAMClient.class);
+
+  public Hadoop26YarnAMClient(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  protected final ContainerId containerIdLookup(String containerIdStr) {
+    return (ContainerId.fromString(containerIdStr));
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
index e8f3b2b..b4b50cd 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
 import org.slf4j.Logger;
@@ -80,7 +79,7 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
     String masterContainerId = System.getenv().get(containerIdEnvName);
     Preconditions.checkArgument(masterContainerId != null,
                                 "Missing %s from environment", containerIdEnvName);
-    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerId = containerIdLookup(masterContainerId);
     this.inflightRequests = ArrayListMultimap.create();
     this.pendingRequests = ArrayListMultimap.create();
     this.pendingRemoves = Lists.newLinkedList();
@@ -89,7 +88,6 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
     this.blacklistedResources = Lists.newArrayList();
   }
 
-
   @Override
   public final ContainerId getContainerId() {
     return containerId;
@@ -227,6 +225,14 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
   }
 
   /**
+   * Returns the ContainerId given a container ID string
+   *
+   * @param containerIdStr the container ID string to lookup
+   * @return A {@link ContainerId} instance representing the result.
+   */
+  protected abstract ContainerId containerIdLookup(String containerIdStr);
+
+  /**
    * Adjusts the given resource capability to fit in the cluster limit.
    *
    * @param capability The capability to be adjusted.

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
index 943efb8..c6ab02b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -48,11 +48,17 @@ public final class VersionDetectYarnAMClientFactory implements YarnAMClientFacto
           clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
           clz = (Class<YarnAMClient>) Class.forName(clzName);
           break;
-        default:
-          // Uses hadoop-2.2 or above class
+        case HADOOP_22:
+        case HADOOP_23:
+          // Uses hadoop-2.2 class
           clzName = getClass().getPackage().getName() + ".Hadoop22YarnAMClient";
           clz = (Class<YarnAMClient>) Class.forName(clzName);
           break;
+        default:
+          // Uses hadoop-2.6 or above class
+          clzName = getClass().getPackage().getName() + ".Hadoop26YarnAMClient";
+          clz = (Class<YarnAMClient>) Class.forName(clzName);
+          break;
       }
 
       return clz.getConstructor(Configuration.class).newInstance(conf);

http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index c0aeb0c..d7e6eb0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -68,7 +68,8 @@ public class YarnUtils {
     HADOOP_20,
     HADOOP_21,
     HADOOP_22,
-    HADOOP_23
+    HADOOP_23,
+    HADOOP_26
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
@@ -263,7 +264,15 @@ public class YarnUtils {
         Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
         try {
           Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
-          HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
+          try {
+            Class[] args = new Class[1];
+            args[0] = String.class;
+            // see if we have a org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
+            Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString", args);
+            HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
+          } catch (NoSuchMethodException e) {
+            HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
+          }
         } catch (ClassNotFoundException e) {
           HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
         }


[03/15] twill git commit: (TWILL-248) Speedup shutdown of tracker service

Posted by ch...@apache.org.
(TWILL-248) Speedup shutdown of tracker service

This closes #63 on Github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/b7785bde
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/b7785bde
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/b7785bde

Branch: refs/heads/site
Commit: b7785bde4e7e990072f803d89353e37f26ed8af5
Parents: aa70499
Author: Terence Yim <ch...@apache.org>
Authored: Mon Oct 30 15:10:34 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Oct 31 10:52:16 2017 -0700

----------------------------------------------------------------------
 .../internal/appmaster/TrackerService.java      | 26 ++++++++++++++++----
 1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/b7785bde/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index bb8cf57..10de10c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -31,6 +31,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -48,6 +50,8 @@ import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.CharsetUtil;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.internal.json.ResourceReportAdapter;
 import org.slf4j.Logger;
@@ -60,6 +64,8 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -82,7 +88,7 @@ public final class TrackerService extends AbstractIdleService {
 
   private String host;
   private ServerBootstrap bootstrap;
-  private Channel serverChannel;
+  private ChannelGroup channelGroup;
   private InetSocketAddress bindAddress;
   private URL url;
 
@@ -119,6 +125,7 @@ public final class TrackerService extends AbstractIdleService {
 
   @Override
   protected void startUp() throws Exception {
+    channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
     EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
                                                      new ThreadFactoryBuilder()
                                                        .setDaemon(true).setNameFormat("boss-thread").build());
@@ -132,6 +139,7 @@ public final class TrackerService extends AbstractIdleService {
       .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
+          channelGroup.add(ch);
           ChannelPipeline pipeline = ch.pipeline();
           pipeline.addLast("codec", new HttpServerCodec());
           pipeline.addLast("compressor", new HttpContentCompressor());
@@ -140,7 +148,9 @@ public final class TrackerService extends AbstractIdleService {
         }
       });
 
-    serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+    Channel serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+    channelGroup.add(serverChannel);
+
     bindAddress = (InetSocketAddress) serverChannel.localAddress();
     url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
 
@@ -149,9 +159,15 @@ public final class TrackerService extends AbstractIdleService {
 
   @Override
   protected void shutDown() throws Exception {
-    serverChannel.close().await();
-    bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
-    bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+    channelGroup.close().awaitUninterruptibly();
+
+    List<Future<?>> futures = new ArrayList<>();
+    futures.add(bootstrap.config().group().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS));
+    futures.add(bootstrap.config().childGroup().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS));
+
+    for (Future<?> future : futures) {
+      future.awaitUninterruptibly();
+    }
 
     LOG.info("Tracker service stopped at {}", url);
   }


[09/15] twill git commit: Add Yuliya Feldman to committer

Posted by ch...@apache.org.
Add Yuliya Feldman to committer

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/439c1096
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/439c1096
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/439c1096

Branch: refs/heads/site
Commit: 439c1096df09939e3f13749722746b767f4ca753
Parents: 7f494d1
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 12 18:49:49 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 12 18:49:49 2018 -0700

----------------------------------------------------------------------
 pom.xml | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/439c1096/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f804fd7..84c4b8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,14 @@
                 <role>Committer</role>
             </roles>
         </developer>
+        <developer>
+            <id>yufeldman</id>
+            <name>Yuliya Feldman</name>
+            <email>yufeldman@apache.org</email>
+            <roles>
+                <role>Committer</role>
+            </roles>
+        </developer>
     </developers>
 
     <modules>


[13/15] twill git commit: Run mvn in --batch-mode to reduce verbosity in travis build

Posted by ch...@apache.org.
Run mvn in --batch-mode to reduce verbosity in travis build

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/8f70aa4d
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/8f70aa4d
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/8f70aa4d

Branch: refs/heads/site
Commit: 8f70aa4d49243ef2a0ddc613ce2f3f9c22f80e97
Parents: 107dc1e
Author: Terence Yim <ch...@apache.org>
Authored: Sun Mar 18 22:21:34 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sun Mar 18 22:21:34 2018 -0700

----------------------------------------------------------------------
 .travis.yml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/8f70aa4d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index fae270d..55101b7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,9 +31,9 @@ branches:
     - /^branch\-.*$/
     - /^feature\/.*$/
 
-script: mvn test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
+script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
 
-install: mvn install -P $PROFILE -DskipTests=true
+install: mvn --batch-mode install -P $PROFILE -DskipTests=true
 
 env:
   - PROFILE='hadoop-2.0'


[06/15] twill git commit: (TWILL-255) Incorrect logging after memory was adjusted. Does not show memory before adjustment

Posted by ch...@apache.org.
(TWILL-255) Incorrect logging after memory was adjusted. Does not show memory before adjustment

This closes #66 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/00a844ad
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/00a844ad
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/00a844ad

Branch: refs/heads/site
Commit: 00a844adedd2b86d3c2ea55a58a4a743c1724aaf
Parents: d6095d4
Author: Yuliya Feldman <yu...@dremio.com>
Authored: Wed Feb 28 14:52:56 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Mar 1 10:07:14 2018 -0800

----------------------------------------------------------------------
 .../org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java       | 2 +-
 .../org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java       | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/00a844ad/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 67bef3e..76de0c0 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -126,8 +126,8 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
     updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
 
     if (resource.getMemory() != updatedMemory) {
-      resource.setMemory(updatedMemory);
       LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+      resource.setMemory(updatedMemory);
     }
 
     return resource;

http://git-wip-us.apache.org/repos/asf/twill/blob/00a844ad/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index f349b4e..42bff62 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -165,8 +165,8 @@ public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.Contai
 
     int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
     if (resource.getMemory() != updatedMemory) {
-      resource.setMemory(updatedMemory);
       LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+      resource.setMemory(updatedMemory);
     }
 
     return resource;


[14/15] twill git commit: (TWILL-258) Use loopback address for ZK server. Also fixes some race conditions in unit tests

Posted by ch...@apache.org.
(TWILL-258) Use loopback address for ZK server. Also fixes some race conditions in unit tests

- Fix a race condition in the LocationCacheTest
  - There is a small delay in the current timestamp in the
    LocationCacheCleaner.start and the one in the test method.
- Fix race condition in LogLevelChangeTestRun
  - The test assumes after the root logger level changed, the other logger levels also changed
    in the resource report, which is not true
  - The test is not checking the log levels for all runnnable instances

This closes #68 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/ee4d1370
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/ee4d1370
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/ee4d1370

Branch: refs/heads/site
Commit: ee4d13701b218305d034bfaa8474ef881995e65c
Parents: 8f70aa4
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 19 00:28:20 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Mar 20 12:54:58 2018 -0700

----------------------------------------------------------------------
 .travis.yml                                     |  2 +-
 .../apache/twill/yarn/LocationCacheCleaner.java |  4 +-
 .../apache/twill/yarn/LocationCacheTest.java    | 11 +++-
 .../twill/yarn/LogLevelChangeTestRun.java       | 59 +++++++++++---------
 .../internal/zookeeper/InMemoryZKServer.java    | 18 ++----
 5 files changed, 51 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 55101b7..af74548 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,7 +31,7 @@ branches:
     - /^branch\-.*$/
     - /^feature\/.*$/
 
-script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
+script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false
 
 install: mvn --batch-mode install -P $PROFILE -DskipTests=true
 

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
index 0738218..fed76a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -144,7 +144,9 @@ final class LocationCacheCleaner extends AbstractIdleService {
               }
               // If the location is already pending for cleanup, this won't update the expire time as
               // the comparison of PendingCleanup is only by location.
-              pendingCleanups.add(new PendingCleanup(location, expireTime));
+              if (pendingCleanups.add(new PendingCleanup(location, expireTime))) {
+                LOG.debug("Pending deletion of location {} with expiration time at {}", location, expireTime);
+              }
             }
           }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
index 8ed9ae4..63ca28b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -109,12 +109,17 @@ public class LocationCacheTest {
     newTwillRunner.start();
 
     // Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
-    // collected when the new twill runner was started
+    // collected when the new twill runner was started.
+    // Need to add some time in addition to the antique expiry time because the cache cleaner collects
+    // pending list asynchronously, which the "current" time it uses to calculate the expiration time might be
+    // later than the System.currentTimeMillis() call in the next line.
     ((YarnTwillRunnerService) newTwillRunner)
-      .forceLocationCacheCleanup(System.currentTimeMillis() + Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+      .forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
+                                   Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
 
     // Now there shouldn't be any file under the current session cache directory
-    Assert.assertTrue(currentSessionCache.list().isEmpty());
+    List<Location> locations = currentSessionCache.list();
+    Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
index 6df6d11..a1d8ae6 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -172,20 +173,20 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
 
     // assert that log level is DEBUG
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
 
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
 
     // change the log level to INFO
     controller.updateLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, LogEntry.Level.INFO)).get();
 
     // assert log level has changed to INFO
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
 
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
 
     // change the log level of LogLevelTestRunnable to WARN,
     // change the log level of LogLevelTestSecondRunnable to TRACE
@@ -195,16 +196,16 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
     controller.updateLogLevels(LogLevelTestSecondRunnable.class.getSimpleName(), logLevelSecondRunnable).get();
 
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN), 1);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE), 1);
 
     // change a particular logger to log level warn and reset it back.
     logLevelFirstRunnable = ImmutableMap.of("test", LogEntry.Level.WARN);
     controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
                     20L, TimeUnit.SECONDS, LogEntry.Level.WARN,
-                    ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN));
+                    ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN), 1);
     logLevelFirstRunnable = new HashMap<>();
     logLevelFirstRunnable.put("test", null);
     controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
@@ -212,13 +213,13 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
     result.put("ROOT", LogEntry.Level.WARN);
     result.put("test", null);
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
 
     // reset the log level for a particular logger of LogLevelTestRunnable
     controller.resetRunnableLogLevels(LogLevelTestRunnable.class.getSimpleName(), "test").get();
     result.remove("test");
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
 
     // change the log level of LogLevelTestSecondRunnable to INFO and change instances of it to test if the log level
     // request get applied to container started up later
@@ -228,14 +229,14 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
     controller.changeInstances(LogLevelTestSecondRunnable.class.getSimpleName(), 2).get();
     TimeUnit.SECONDS.sleep(5);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(), 20L, TimeUnit.SECONDS,
-                    LogEntry.Level.INFO, logLevelSecondRunnable);
+                    LogEntry.Level.INFO, logLevelSecondRunnable, 2);
 
     // reset the log levels back to default.
     controller.resetLogLevels().get();
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 2);
 
     // stop
     controller.terminate().get(120, TimeUnit.SECONDS);
@@ -248,29 +249,37 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
   // could return null if the application has not fully started.
   private void waitForLogLevel(TwillController controller, String runnable, long timeout,
                                TimeUnit timeoutUnit, LogEntry.Level expected,
-                               Map<String, LogEntry.Level> expectedArgs) throws InterruptedException {
+                               Map<String, LogEntry.Level> expectedArgs,
+                               int expectedInstances) throws InterruptedException {
 
     Stopwatch stopwatch = new Stopwatch();
     stopwatch.start();
-    LogEntry.Level actual = null;
-    Map<String, LogEntry.Level> actualArgs = null;
-    boolean stopped = false;
-    do {
+    while (stopwatch.elapsedTime(timeoutUnit) < timeout) {
       ResourceReport report = controller.getResourceReport();
+
       if (report == null || report.getRunnableResources(runnable) == null) {
+        TimeUnit.MILLISECONDS.sleep(100);
         continue;
       }
+
+      int matchCount = 0;
       for (TwillRunResources resources : report.getRunnableResources(runnable)) {
-        actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
-        actualArgs = resources.getLogLevels();
-        if (actual != null && actual.equals(expected)) {
-          stopped = true;
-          break;
+        LogEntry.Level actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
+        Map<String, LogEntry.Level> actualArgs = resources.getLogLevels();
+        if (Objects.equals(expected, actual) && Objects.equals(expectedArgs, actualArgs)) {
+          matchCount++;
+        } else {
+          LOG.info("Log levels not match for {}. {} != {} or {} != {}",
+                   runnable, expected, actual, expectedArgs, actualArgs);
         }
       }
+
+      if (matchCount == expectedInstances) {
+        return;
+      }
       TimeUnit.MILLISECONDS.sleep(100);
-    } while (!stopped && stopwatch.elapsedTime(timeoutUnit) < timeout);
-    Assert.assertEquals(expected, actual);
-    Assert.assertEquals(expectedArgs, actualArgs);
+    }
+
+    Assert.fail("Timeout waiting for expected log levels");
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
index f962b68..d18d5ed 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal.zookeeper;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.Executor;
 
 /**
@@ -103,17 +101,11 @@ public final class InMemoryZKServer implements Service {
   }
 
   private InetSocketAddress getAddress(int port) {
-    try {
-      int socketPort = port < 0 ? 0 : port;
-      // This property is needed so that in certain CI environment (e.g. Travis-CI) it can only works properly if
-      // it is binded to the wildcard (0.0.0.0) address
-      if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
-        return new InetSocketAddress(InetAddress.getLocalHost(), socketPort);
-      } else {
-        return new InetSocketAddress(socketPort);
-      }
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
+    int socketPort = port < 0 ? 0 : port;
+    if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
+      return new InetSocketAddress(InetAddress.getLoopbackAddress(), socketPort);
+    } else {
+      return new InetSocketAddress(socketPort);
     }
   }
 


[08/15] twill git commit: Added the missing hadoop-2.6 profile in the twill-yarn module

Posted by ch...@apache.org.
Added the missing hadoop-2.6 profile in the twill-yarn module

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/7f494d13
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/7f494d13
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/7f494d13

Branch: refs/heads/site
Commit: 7f494d13da45e6f9a27dfe1b4bde9d521d306c20
Parents: af60a02
Author: Terence Yim <ch...@apache.org>
Authored: Fri Mar 9 13:55:05 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Mar 9 13:55:05 2018 -0800

----------------------------------------------------------------------
 twill-yarn/pom.xml | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/7f494d13/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index 9e99886..ae68a2e 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -178,6 +178,19 @@
             </build>
         </profile>
         <profile>
+            <id>hadoop-2.6</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+        <profile>
             <id>mapr-hadoop-2.4</id>
             <dependencies>
                 <dependency>


[11/15] twill git commit: Update ReleaseGuide to include latest version of Hadoop

Posted by ch...@apache.org.
Update ReleaseGuide to include latest version of Hadoop

Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/4b0cf5e7
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/4b0cf5e7
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/4b0cf5e7

Branch: refs/heads/site
Commit: 4b0cf5e7164dd12ad9d73b9420f6bc4e7a5667a1
Parents: 1c8fec8
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 12 18:50:26 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 12 18:50:26 2018 -0700

----------------------------------------------------------------------
 src/site/markdown/ReleaseGuide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/4b0cf5e7/src/site/markdown/ReleaseGuide.md
----------------------------------------------------------------------
diff --git a/src/site/markdown/ReleaseGuide.md b/src/site/markdown/ReleaseGuide.md
index 551d48d..0fd59a1 100644
--- a/src/site/markdown/ReleaseGuide.md
+++ b/src/site/markdown/ReleaseGuide.md
@@ -82,8 +82,8 @@ git push origin v${RELEASE_VERSION}
 #### Build the source tarball and publish artifacts to the staging repo
 ```
 mvn clean prepare-package -DskipTests -Dremoteresources.skip=true -P hadoop-2.0 &&
-mvn prepare-package -DskipTests -Dremoteresources.skip=true -P hadoop-2.3 &&
-mvn deploy -DskipTests -Dremoteresources.skip=true -P hadoop-2.3 -P apache-release
+mvn prepare-package -DskipTests -Dremoteresources.skip=true -P hadoop-2.6 &&
+mvn deploy -DskipTests -Dremoteresources.skip=true -P hadoop-2.6 -P apache-release
 ```
 The source tarball can be found in `target/apache-twill-${RELEASE_VERSION}-source-release.tar.gz`
 after the above command has successfully completed.


[15/15] twill git commit: Merge branch 'master' into site

Posted by ch...@apache.org.
Merge branch 'master' into site


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/2a97588c
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/2a97588c
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/2a97588c

Branch: refs/heads/site
Commit: 2a97588ccf78f6782682635bab8a147f8b930f80
Parents: 4b0cf5e ee4d137
Author: Terence Yim <ch...@apache.org>
Authored: Wed Mar 21 22:19:38 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Mar 21 22:19:38 2018 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 pom.xml                                         |  14 ++
 .../main/java/org/apache/twill/api/Configs.java |  12 ++
 .../twill/internal/AbstractTwillController.java |  18 ++
 .../twill/internal/AbstractTwillService.java    |  14 +-
 .../kafka/client/SimpleKafkaConsumer.java       |  19 +-
 .../kafka/client/SimpleKafkaPublisher.java      |  41 ++--
 .../internal/kafka/client/ZKBrokerService.java  |  39 ++--
 .../internal/yarn/Hadoop21YarnAppClient.java    |  17 +-
 .../internal/yarn/Hadoop23YarnAppClient.java    |   4 +-
 .../internal/yarn/Hadoop26YarnAppClient.java    |  48 +++++
 .../appmaster/ApplicationMasterMain.java        |   7 +-
 .../appmaster/ApplicationMasterService.java     |  10 +-
 .../yarn/VersionDetectYarnAppClientFactory.java |   6 +-
 .../apache/twill/yarn/LocationCacheCleaner.java |   4 +-
 .../apache/twill/yarn/YarnTwillController.java  |  11 ++
 .../apache/twill/yarn/AppRecoveryTestRun.java   | 189 +++++++++++++++++++
 .../apache/twill/yarn/LocationCacheTest.java    |  11 +-
 .../twill/yarn/LogLevelChangeTestRun.java       |  59 +++---
 .../internal/zookeeper/InMemoryZKServer.java    |  18 +-
 .../apache/twill/zookeeper/ZKOperations.java    |  72 +++++++
 21 files changed, 519 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/2a97588c/pom.xml
----------------------------------------------------------------------


[02/15] twill git commit: Extra commit to close PR

Posted by ch...@apache.org.
Extra commit to close PR

This closes #62 on Github


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/aa70499e
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/aa70499e
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/aa70499e

Branch: refs/heads/site
Commit: aa70499ea083a783cda4daf3261aec0383fb1aa6
Parents: 6d6b388
Author: Terence Yim <ch...@apache.org>
Authored: Mon Oct 30 15:25:44 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Oct 30 15:25:44 2017 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[10/15] twill git commit: Merge branch 'master' into site

Posted by ch...@apache.org.
Merge branch 'master' into site


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/1c8fec89
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/1c8fec89
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/1c8fec89

Branch: refs/heads/site
Commit: 1c8fec89207fe2875c688acbb8285e7acf848907
Parents: 49f3595 439c109
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 12 18:50:02 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 12 18:50:02 2018 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   7 +-
 pom.xml                                         |  73 +++++-
 twill-core/pom.xml                              |  10 +-
 twill-yarn/pom.xml                              |  13 ++
 .../internal/yarn/Hadoop20YarnAMClient.java     |   9 +-
 .../internal/yarn/Hadoop20YarnNMClient.java     |  12 +-
 .../internal/yarn/Hadoop21YarnAMClient.java     |   9 +-
 .../internal/yarn/Hadoop21YarnNMClient.java     |  14 +-
 .../internal/yarn/Hadoop22YarnAMClient.java     |   2 +-
 .../internal/yarn/Hadoop26YarnAMClient.java     |  42 ++++
 .../internal/appmaster/TrackerService.java      | 231 ++++++++++---------
 .../internal/yarn/AbstractYarnAMClient.java     |  12 +-
 .../yarn/VersionDetectYarnAMClientFactory.java  |  10 +-
 .../apache/twill/internal/yarn/YarnUtils.java   |  13 +-
 .../apache/twill/yarn/ResourceReportClient.java |  36 ++-
 15 files changed, 351 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/1c8fec89/pom.xml
----------------------------------------------------------------------


[12/15] twill git commit: (TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed

Posted by ch...@apache.org.
(TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed

- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
  - When AM killed and restarted, the embedded Kafka will be running in different host and port

This closes #67 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/107dc1e2
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/107dc1e2
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/107dc1e2

Branch: refs/heads/site
Commit: 107dc1e20c63207695bb5f8b5f97186b6b3f9412
Parents: 439c109
Author: Terence Yim <ch...@apache.org>
Authored: Fri Mar 9 12:21:26 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Mar 15 16:13:34 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |  14 ++
 .../main/java/org/apache/twill/api/Configs.java |  12 ++
 .../twill/internal/AbstractTwillController.java |  18 ++
 .../twill/internal/AbstractTwillService.java    |  14 +-
 .../kafka/client/SimpleKafkaConsumer.java       |  19 +-
 .../kafka/client/SimpleKafkaPublisher.java      |  41 ++--
 .../internal/kafka/client/ZKBrokerService.java  |  39 ++--
 .../internal/yarn/Hadoop21YarnAppClient.java    |  17 +-
 .../internal/yarn/Hadoop23YarnAppClient.java    |   4 +-
 .../internal/yarn/Hadoop26YarnAppClient.java    |  48 +++++
 .../appmaster/ApplicationMasterMain.java        |   7 +-
 .../appmaster/ApplicationMasterService.java     |  10 +-
 .../yarn/VersionDetectYarnAppClientFactory.java |   6 +-
 .../apache/twill/yarn/YarnTwillController.java  |  11 ++
 .../apache/twill/yarn/AppRecoveryTestRun.java   | 189 +++++++++++++++++++
 .../apache/twill/zookeeper/ZKOperations.java    |  72 +++++++
 16 files changed, 467 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 84c4b8c..45aa64d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,7 @@
         <jopt-simple.version>3.2</jopt-simple.version>
         <commons-compress.version>1.5</commons-compress.version>
         <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
+        <force.mac.tests>false</force.mac.tests>
     </properties>
 
     <scm>
@@ -346,6 +347,7 @@
                     <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
                     <systemPropertyVariables>
                         <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+                        <force.mac.tests>${force.mac.tests}</force.mac.tests>
                     </systemPropertyVariables>
                     <reuseForks>false</reuseForks>
                     <reportFormat>plain</reportFormat>
@@ -362,6 +364,18 @@
 
     <profiles>
         <profile>
+            <!--
+                This profile is to force certain tests to run on Mac.
+                Those tests are disabled due to orphan processes left after the test run (HADOOP-12317).
+                If this profile is enabled, after the test finished, run the `jps` command
+                and delete all `TwillLauncher` processes
+            -->
+            <id>force-mac-tests</id>
+            <properties>
+                <force.mac.tests>true</force.mac.tests>
+            </properties>
+        </profile>
+        <profile>
             <id>apache-release</id>
             <build>
                 <plugins>

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-api/src/main/java/org/apache/twill/api/Configs.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
index 9a21489..20a25f6 100644
--- a/twill-api/src/main/java/org/apache/twill/api/Configs.java
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -79,6 +79,18 @@ public final class Configs {
     public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb";
 
     /**
+     * Maximum number of attempts to run the application by YARN if there is failure.
+     */
+    public static final String YARN_MAX_APP_ATTEMPTS = "twill.yarn.max.app.attempts";
+
+    /**
+     * Interval time in milliseconds for the attempt failures validity interval in YARN. YARN only limit to
+     * the maximum attempt count for failures in the given interval.
+     */
+    public static final String YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL =
+      "twill.yarn.attempt.failures.validity.interval";
+
+    /**
      * Setting for enabling log collection.
      */
     public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index fd8a939..0ff2fc8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -222,6 +222,24 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
     return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames);
   }
 
+  /**
+   * Reset the log handler to poll from the beginning of Kafka.
+   */
+  protected final synchronized void resetLogHandler() {
+    if (kafkaClient == null) {
+      return;
+    }
+    if (logCancellable != null) {
+      logCancellable.cancel();
+      logCancellable = null;
+    }
+    if (!logHandlers.isEmpty()) {
+      logCancellable = kafkaClient.getConsumer().prepare()
+        .addFromBeginning(Constants.LOG_TOPIC, 0)
+        .consume(new LogMessageCallback(logHandlers));
+    }
+  }
+
   private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
     ResourceReport resourceReport = getResourceReport();
     if (resourceReport == null) {

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 8e73653..425cd43 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -206,7 +206,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
   }
 
   /**
-   * Update the live node for the runnable.
+   * Update the live node for the service.
    *
    * @return A {@link OperationFuture} that will be completed when the update is done.
    */
@@ -216,11 +216,15 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
     return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the creation is done.
+   */
   private OperationFuture<String> createLiveNode() {
-    String liveNodePath = getLiveNodePath();
-    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
-    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
-                                    KeeperException.NodeExistsException.class, liveNodePath);
+    final String liveNodePath = getLiveNodePath();
+    LOG.info("Creating live node {}{}", zkClient.getConnectString(), liveNodePath);
+    return ZKOperations.createDeleteIfExists(zkClient, liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL, true);
   }
 
   private OperationFuture<String> removeLiveNode() {

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 73235b7..f69350e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -340,17 +340,17 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
           continue;
         }
 
-        // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
-        // from kafak server.
-        long off = offset.get();
-        if (off < 0) {
-          offset.set(getLastOffset(topicPart, off));
-        }
+        try {
+          // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
+          // from kafak server.
+          long off = offset.get();
+          if (off < 0) {
+            offset.set(getLastOffset(topicPart, off));
+          }
 
-        SimpleConsumer consumer = consumerEntry.getValue();
+          SimpleConsumer consumer = consumerEntry.getValue();
 
-        // Fire a fetch message request
-        try {
+          // Fire a fetch message request
           FetchResponse response = fetchMessages(consumer, offset.get());
 
           // Failure response, set consumer entry to null and let next round of loop to handle it.
@@ -364,6 +364,7 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
 
             consumers.refresh(consumerEntry.getKey());
             consumerEntry = null;
+            backoff.backoff();
             continue;
           }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index f147d24..e5d0f8d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -54,11 +54,11 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
   private final AtomicReference<Producer<Integer, ByteBuffer>> producer;
   private final AtomicBoolean listenerCancelled;
 
-  public SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
+  SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
     this.brokerService = brokerService;
     this.ack = ack;
     this.compression = compression;
-    this.producer = new AtomicReference<Producer<Integer, ByteBuffer>>();
+    this.producer = new AtomicReference<>();
     this.listenerCancelled = new AtomicBoolean(false);
   }
 
@@ -107,7 +107,7 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
 
     @Override
     public Preparer add(ByteBuffer message, Object partitionKey) {
-      messages.add(new KeyedMessage<Integer, ByteBuffer>(topic, Math.abs(partitionKey.hashCode()), message));
+      messages.add(new KeyedMessage<>(topic, Math.abs(partitionKey.hashCode()), message));
       return this;
     }
 
@@ -159,30 +159,37 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
       }
 
       String newBrokerList = brokerService.getBrokerList();
-      if (newBrokerList.isEmpty()) {
-        LOG.warn("Broker list is empty. No Kafka producer is created.");
-        return;
-      }
 
+      // If there is no change, whether it is empty or not, just return
       if (Objects.equal(brokerList, newBrokerList)) {
         return;
       }
 
-      Properties props = new Properties();
-      props.put("metadata.broker.list", newBrokerList);
-      props.put("serializer.class", ByteBufferEncoder.class.getName());
-      props.put("key.serializer.class", IntegerEncoder.class.getName());
-      props.put("partitioner.class", IntegerPartitioner.class.getName());
-      props.put("request.required.acks", Integer.toString(ack.getAck()));
-      props.put("compression.codec", compression.getCodec());
+      Producer<Integer, ByteBuffer> newProducer = null;
+      if (!newBrokerList.isEmpty()) {
+        Properties props = new Properties();
+        props.put("metadata.broker.list", newBrokerList);
+        props.put("serializer.class", ByteBufferEncoder.class.getName());
+        props.put("key.serializer.class", IntegerEncoder.class.getName());
+        props.put("partitioner.class", IntegerPartitioner.class.getName());
+        props.put("request.required.acks", Integer.toString(ack.getAck()));
+        props.put("compression.codec", compression.getCodec());
+
+        ProducerConfig config = new ProducerConfig(props);
+        newProducer = new Producer<>(config);
+      }
 
-      ProducerConfig config = new ProducerConfig(props);
-      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config));
+      // If the broker list is empty, the producer will be set to null
+      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer);
       if (oldProducer != null) {
         oldProducer.close();
       }
 
-      LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+      if (newBrokerList.isEmpty()) {
+        LOG.warn("Empty Kafka producer broker list, publish will fail.");
+      } else {
+        LOG.info("Updated Kafka producer broker list: {}", newBrokerList);
+      }
       brokerList = newBrokerList;
     }
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
index 2ffc604..de42b9b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
@@ -51,6 +51,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 
 /**
  * A {@link BrokerService} that watches kafka zk nodes for updates of broker lists and leader for
@@ -136,11 +139,13 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
       return brokerList.get();
     }
 
-    final SettableFuture<?> readerFuture = SettableFuture.create();
-    final AtomicReference<Iterable<BrokerInfo>> brokers =
-      new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of());
+    final SettableFuture<?> readyFuture = SettableFuture.create();
+    final AtomicReference<List<BrokerInfo>> brokers = new AtomicReference<>(Collections.<BrokerInfo>emptyList());
 
     actOnExists(BROKER_IDS_PATH, new Runnable() {
+
+      final Runnable thisRunnable = this;
+
       @Override
       public void run() {
         // Callback for fetching children list. This callback should be executed in the executorService.
@@ -154,19 +159,19 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
                   Iterables.transform(
                     brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(),
                     Suppliers.<BrokerInfo>supplierFunction())));
-              readerFuture.set(null);
+              readyFuture.set(null);
 
               for (ListenerExecutor listener : listeners) {
                 listener.changed(ZKBrokerService.this);
               }
             } catch (ExecutionException e) {
-              readerFuture.setException(e.getCause());
+              readyFuture.setException(e.getCause());
             }
           }
 
           @Override
           public void onFailure(Throwable t) {
-            readerFuture.setException(t);
+            readyFuture.setException(t);
           }
         };
 
@@ -179,15 +184,25 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
             }
             if (event.getType() == Event.EventType.NodeChildrenChanged) {
               Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService);
+            } else if (event.getType() == Event.EventType.NodeDeleted) {
+              // If the ids node is deleted, clear the broker list and re-watch.
+              // This could happen when the Kafka server is restarted and have the ZK node cleanup
+              // The readyFuture for this call doesn't matter, as we don't need to block on anything
+              brokers.set(Collections.<BrokerInfo>emptyList());
+              for (ListenerExecutor listener : listeners) {
+                listener.changed(ZKBrokerService.this);
+              }
+              actOnExists(BROKER_IDS_PATH, thisRunnable, SettableFuture.create(),
+                          FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
             }
           }
         }), childrenCallback, executorService);
       }
-    }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
+    }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
 
-    brokerList = createSupplier(brokers);
+    brokerList = this.<Iterable<BrokerInfo>>createSupplier(brokers);
     try {
-      readerFuture.get();
+      readyFuture.get();
     } catch (Exception e) {
       throw Throwables.propagate(e);
     }
@@ -223,7 +238,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
       public Supplier<T> load(final K key) throws Exception {
         // A future to tell if the result is ready, even it is failure.
         final SettableFuture<T> readyFuture = SettableFuture.create();
-        final AtomicReference<T> resultValue = new AtomicReference<T>();
+        final AtomicReference<T> resultValue = new AtomicReference<>();
 
         // Fetch for node data when it exists.
         final String path = key.getPath();
@@ -312,7 +327,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
       }
     }), new FutureCallback<Stat>() {
       @Override
-      public void onSuccess(Stat result) {
+      public void onSuccess(@Nullable Stat result) {
         if (result != null) {
           action.run();
         } else {
@@ -345,7 +360,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
   /**
    * Creates a supplier that always return latest copy from an {@link java.util.concurrent.atomic.AtomicReference}.
    */
-  private <T> Supplier<T> createSupplier(final AtomicReference<T> ref) {
+  private <T> Supplier<T> createSupplier(final AtomicReference<? extends T> ref) {
     return new Supplier<T>() {
       @Override
       public T get() {

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index aa14a75..c219171 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -59,7 +59,7 @@ import javax.annotation.Nullable;
 public class Hadoop21YarnAppClient implements YarnAppClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
-  private final Configuration configuration;
+  protected final Configuration configuration;
 
   public Hadoop21YarnAppClient(Configuration configuration) {
     this.configuration = configuration;
@@ -108,7 +108,7 @@ public class Hadoop21YarnAppClient implements YarnAppClient {
             addRMToken(launchContext, yarnClient, appId);
             appSubmissionContext.setAMContainerSpec(launchContext);
             appSubmissionContext.setResource(capability);
-            appSubmissionContext.setMaxAppAttempts(2);
+            configureAppSubmissionContext(appSubmissionContext);
 
             yarnClient.submitApplication(appSubmissionContext);
             return new ProcessControllerImpl(appId);
@@ -126,6 +126,19 @@ public class Hadoop21YarnAppClient implements YarnAppClient {
     }
   }
 
+  /**
+   * Updates the {@link ApplicationSubmissionContext} based on configuration.
+   */
+  protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+    int maxAttempts = configuration.getInt(Configs.Keys.YARN_MAX_APP_ATTEMPTS, -1);
+    if (maxAttempts > 0) {
+      context.setMaxAppAttempts(maxAttempts);
+    } else {
+      // Preserve the old behavior
+      context.setMaxAppAttempts(2);
+    }
+  }
+
   private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
     int maxMemory = response.getMaximumResourceCapability().getMemory();
     int updatedMemory = capability.getMemory();

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
index 97d2a64..0e3382f 100644
--- a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
@@ -48,14 +48,12 @@ import java.util.List;
  * </p>
  */
 @SuppressWarnings("unused")
-public final class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
+public class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop23YarnAppClient.class);
-  private final Configuration configuration;
 
   public Hadoop23YarnAppClient(Configuration configuration) {
     super(configuration);
-    this.configuration = configuration;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
new file mode 100644
index 0000000..1c27518
--- /dev/null
+++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.twill.api.Configs;
+
+/**
+ * <p>
+ * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.6 and beyond.
+ *
+ * The {@link VersionDetectYarnAppClientFactory} class will decide to return instance of this class for
+ * Apache Hadoop 2.6 and beyond.
+ * </p>
+ */
+@SuppressWarnings("unused")
+public class Hadoop26YarnAppClient extends Hadoop23YarnAppClient {
+
+  public Hadoop26YarnAppClient(Configuration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+    super.configureAppSubmissionContext(context);
+    long interval = configuration.getLong(Configs.Keys.YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL, -1L);
+    if (interval > 0) {
+      context.setAttemptFailuresValidityInterval(interval);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 445656d..7706d52 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -165,9 +165,10 @@ public final class ApplicationMasterMain extends ServiceMain {
 
     @Override
     protected void startUp() throws Exception {
-      ZKOperations.ignoreError(
-        zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
-        KeeperException.NodeExistsException.class, kafkaZKPath).get();
+      // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
+      // no left over content from previous AM attempt.
+      LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
+      ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get();
       kafkaServer.startAndWait();
     }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 6fc31f5..8a80041 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -78,7 +78,9 @@ import org.apache.twill.internal.yarn.YarnContainerStatus;
 import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -318,8 +320,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
     instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
-    // Creates ZK path for runnable
-    zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
+    // Creates ZK path for runnable. It's ok if the path already exists.
+    // That's for the case when the AM get killed and restarted
+    ZKOperations.ignoreError(
+      zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
+      KeeperException.NodeExistsException.class, null)
+      .get();
     runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
     runnableContainerRequests = initContainerRequests();
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index c8e88c9..83de2a4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -39,9 +39,13 @@ public final class VersionDetectYarnAppClientFactory implements YarnAppClientFac
           // 2.1 and 2.2 uses the same YarnAppClient
           clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
           break;
-        default:
+        case HADOOP_23:
           // 2.3 and above uses the 2.3 YarnAppClient to support RM HA
           clzName = getClass().getPackage().getName() + ".Hadoop23YarnAppClient";
+          break;
+        default:
+          // Anything above 2.3 will be 2.6 and beyond
+          clzName = getClass().getPackage().getName() + ".Hadoop26YarnAppClient";
       }
       Class<YarnAppClient> clz = (Class<YarnAppClient>) Class.forName(clzName);
       return clz.getConstructor(Configuration.class).newInstance(configuration);

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 335d7ec..8f844a2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -68,6 +69,7 @@ final class YarnTwillController extends AbstractTwillController implements Twill
   private final TimeUnit startTimeoutUnit;
   private volatile ApplicationMasterLiveNodeData amLiveNodeData;
   private ProcessController<YarnApplicationReport> processController;
+  private ApplicationAttemptId currentAttemptId;
 
   // Thread for polling yarn for application status if application got ZK session expire.
   // Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback.
@@ -141,6 +143,8 @@ final class YarnTwillController extends AbstractTwillController implements Twill
         LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId);
         forceShutDown();
       }
+
+      currentAttemptId = report.getCurrentApplicationAttemptId();
     } catch (Exception e) {
       throw Throwables.propagate(e);
     }
@@ -273,6 +277,13 @@ final class YarnTwillController extends AbstractTwillController implements Twill
               shutdown = true;
               break;
             }
+            ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId();
+            if (currentAttemptId.compareTo(attemptId) != 0) {
+              LOG.info("Application attempt ID change from {} to {}", currentAttemptId, attemptId);
+              currentAttemptId = attemptId;
+              resetLogHandler();
+            }
+
             // Make a sync exists call to instance node and re-watch if the node exists
             try {
               // The timeout is arbitrary, as it's just for avoiding block forever

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
new file mode 100644
index 0000000..4f5adce
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.twill.yarn;
+
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for application master resilience.
+ */
+public class AppRecoveryTestRun extends BaseYarnTest {
+
+  @ClassRule
+  public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+  @Test
+  public void testAMRestart() throws Exception {
+    // Only run it with Hadoop-2.1 or above
+    Assume.assumeTrue(YarnUtils.getHadoopVersion().compareTo(YarnUtils.HadoopVersions.HADOOP_21) >= 0);
+    // Don't run this test in Mac, as there would be leftover java process (HADOOP-12317)
+    // The test can be force to run by turning on the "force-mac-tests" maven profile
+    // After the test finished, run the `jps` command and delete all `TwillLauncher` processes
+    Assume.assumeTrue(Boolean.parseBoolean(System.getProperty("force.mac.tests")) ||
+                      !System.getProperty("os.name").toLowerCase().contains("mac"));
+
+    File watchFile = TEMP_FOLDER.newFile();
+    watchFile.delete();
+
+    // Start the testing app, and wait for 4 log lines that match the pattern emitted by the event handler (AM)
+    // and from the runnable
+    final Semaphore semaphore = new Semaphore(0);
+    TwillRunner runner = getTwillRunner();
+    TwillController controller = runner.prepare(new TestApp(new TestEventHandler(watchFile)))
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      // Use a log handler to match messages from AM and the Runnable to make sure the log collection get resumed
+      // correctly after the AM restarted
+      .addLogHandler(new LogHandler() {
+        @Override
+        public void onLog(LogEntry logEntry) {
+          String message = logEntry.getMessage();
+          if (message.equals("Container for " + TestRunnable.class.getSimpleName() + " launched")) {
+            semaphore.release();
+          } else if (message.equals("Running 0")) {
+            semaphore.release();
+          }
+        }
+      })
+      .start();
+
+    // Wait for the first attempt running
+    Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+    // Touch the watchFile so that the event handler will kill the AM
+    Files.touch(watchFile);
+    // Wait for the second attempt running
+    Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+
+    controller.terminate().get();
+  }
+
+  /**
+   * A {@link EventHandler} for killing the first attempt of the application.
+   */
+  public static final class TestEventHandler extends EventHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestEventHandler.class);
+
+    private File watchFile;
+
+    TestEventHandler(File watchFile) {
+      this.watchFile = watchFile;
+    }
+
+    @Override
+    public void containerLaunched(String runnableName, int instanceId, String containerId) {
+      LOG.info("Container for {} launched", runnableName);
+
+      if (containerId.contains("_01_")) {
+        final File watchFile = new File(context.getSpecification().getConfigs().get("watchFile"));
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            // Wait for the watch file to be available, then kill the process
+            while (!watchFile.exists()) {
+              Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            }
+            Runtime.getRuntime().halt(-1);
+          }
+        };
+        t.setDaemon(true);
+        t.start();
+      }
+    }
+
+    @Override
+    protected Map<String, String> getConfigs() {
+      return Collections.singletonMap("watchFile", watchFile.getAbsolutePath());
+    }
+  }
+
+  /**
+   * Application for testing
+   */
+  public static final class TestApp implements TwillApplication {
+
+    private final EventHandler eventHandler;
+
+    public TestApp(EventHandler eventHandler) {
+      this.eventHandler = eventHandler;
+    }
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("TestApp")
+        .withRunnable()
+        .add(new TestRunnable()).noLocalFiles()
+        .anyOrder()
+        .withEventHandler(eventHandler).build();
+    }
+  }
+
+  /**
+   * Runnable for testing
+   */
+  public static final class TestRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestRunnable.class);
+
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    @Override
+    public void run() {
+      long count = 0;
+      try {
+        while (!stopLatch.await(2, TimeUnit.SECONDS)) {
+          LOG.info("Running {}", count++);
+        }
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted", e);
+      }
+    }
+
+    @Override
+    public void stop() {
+      stopLatch.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
index 0e2239d..bce6391 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
@@ -25,16 +25,21 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
 
 /**
  * Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
@@ -282,6 +287,73 @@ public final class ZKOperations {
   }
 
   /**
+   * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen
+   * and the creation will be retried.
+   */
+  public static OperationFuture<String> createDeleteIfExists(final ZKClient zkClient, final String path,
+                                                             @Nullable final byte[] data, final CreateMode createMode,
+                                                             final boolean createParent, final ACL...acls) {
+    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(path,
+                                                                                        Threads.SAME_THREAD_EXECUTOR);
+    final List<ACL> createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+    createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback<String>() {
+
+      final FutureCallback<String> createCallback = this;
+
+      @Override
+      public void onSuccess(String result) {
+        // Create succeeded, just set the result to the resultFuture
+        resultFuture.set(result);
+      }
+
+      @Override
+      public void onFailure(final Throwable createFailure) {
+        // If create failed not because of the NodeExistsException, just set the exception to the result future
+        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
+          resultFuture.setException(createFailure);
+          return;
+        }
+
+        // Try to delete the path
+        LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path);
+        Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback<String>() {
+          @Override
+          public void onSuccess(String result) {
+            // If delete succeeded, perform the creation again.
+            createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            // If deletion failed because of NoNodeException, fail the result operation future
+            if (!(t instanceof KeeperException.NoNodeException)) {
+              createFailure.addSuppressed(t);
+              resultFuture.setException(createFailure);
+              return;
+            }
+
+            // If can't delete because the node no longer exists, just go ahead and recreate the node
+            createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+          }
+        }, Threads.SAME_THREAD_EXECUTOR);
+      }
+    });
+
+    return resultFuture;
+  }
+
+  /**
+   * Private helper method to create a ZK node based on the parameter. The result of the creation is always
+   * communicate via the provided {@link FutureCallback}.
+   */
+  private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data,
+                                 CreateMode createMode, boolean createParent,
+                                 Iterable<ACL> acls, FutureCallback<String> callback) {
+    Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls),
+                        callback, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  /**
    * Watch for the given path until it exists.
    * @param zkClient The {@link ZKClient} to use.
    * @param path A ZooKeeper path to watch for existent.