You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2018/03/22 05:20:05 UTC
[01/15] twill git commit: (TWILL-248) Upgrade to use Netty-4.1
Repository: twill
Updated Branches:
refs/heads/site 49f35954d -> 2a97588cc
(TWILL-248) Upgrade to use Netty-4.1
- Also enable ResourceReportClient to use HTTP compression
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6d6b3882
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6d6b3882
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6d6b3882
Branch: refs/heads/site
Commit: 6d6b3882a631e2260865f641674ee362c52fd1a0
Parents: f34a39a
Author: Terence Yim <ch...@apache.org>
Authored: Tue Oct 10 13:26:11 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Oct 16 09:10:18 2017 -0700
----------------------------------------------------------------------
pom.xml | 14 +-
twill-core/pom.xml | 10 +-
.../internal/appmaster/TrackerService.java | 217 ++++++++++---------
.../apache/twill/yarn/ResourceReportClient.java | 36 ++-
4 files changed, 159 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6aa474..573936d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@
<guava.version>13.0.1</guava.version>
<gson.version>2.2.4</gson.version>
<findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
- <netty.version>3.6.6.Final</netty.version>
+ <netty.version>4.1.16.Final</netty.version>
<snappy-java.version>1.0.5</snappy-java.version>
<jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version>
<asm.version>5.0.2</asm.version>
@@ -777,7 +777,17 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index f265f26..4bee172 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -55,7 +55,15 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index f91efcc..bb8cf57 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -20,38 +20,36 @@ package org.apache.twill.internal.appmaster;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import io.netty.util.ReferenceCountUtil;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.internal.json.ResourceReportAdapter;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +60,6 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@@ -78,14 +74,15 @@ public final class TrackerService extends AbstractIdleService {
private static final Logger LOG = LoggerFactory.getLogger(TrackerService.class);
private static final int NUM_BOSS_THREADS = 1;
+ private static final int NUM_WORKER_THREADS = 10;
private static final int CLOSE_CHANNEL_TIMEOUT = 5;
private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
private final Supplier<ResourceReport> resourceReport;
- private final ChannelGroup channelGroup;
private String host;
private ServerBootstrap bootstrap;
+ private Channel serverChannel;
private InetSocketAddress bindAddress;
private URL url;
@@ -95,7 +92,6 @@ public final class TrackerService extends AbstractIdleService {
* @param resourceReport live report that the service will return to clients.
*/
TrackerService(Supplier<ResourceReport> resourceReport) {
- this.channelGroup = new DefaultChannelGroup("appMasterTracker");
this.resourceReport = resourceReport;
}
@@ -123,52 +119,40 @@ public final class TrackerService extends AbstractIdleService {
@Override
protected void startUp() throws Exception {
- Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("boss-thread")
- .build());
-
- Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("worker-thread#%d")
- .build());
-
- ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
-
- bootstrap = new ServerBootstrap(factory);
-
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() {
- ChannelPipeline pipeline = Channels.pipeline();
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("compressor", new HttpContentCompressor());
- pipeline.addLast("handler", new ReportHandler());
-
- return pipeline;
- }
- });
-
- Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
- bindAddress = (InetSocketAddress) channel.getLocalAddress();
+ EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
+ new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("boss-thread").build());
+ EventLoopGroup workerGroup = new NioEventLoopGroup(NUM_WORKER_THREADS,
+ new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("worker-thread#%d").build());
+
+ bootstrap = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("codec", new HttpServerCodec());
+ pipeline.addLast("compressor", new HttpContentCompressor());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_INPUT_SIZE));
+ pipeline.addLast("handler", new ReportHandler());
+ }
+ });
+
+ serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+ bindAddress = (InetSocketAddress) serverChannel.localAddress();
url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
- channelGroup.add(channel);
LOG.info("Tracker service started at {}", url);
}
@Override
protected void shutDown() throws Exception {
- try {
- if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
- LOG.warn("Timeout when closing all channels.");
- }
- } finally {
- bootstrap.releaseExternalResources();
- }
+ serverChannel.close().await();
+ bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+ bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+
LOG.info("Tracker service stopped at {}", url);
}
@@ -176,7 +160,7 @@ public final class TrackerService extends AbstractIdleService {
* Handler to return resources used by this application master, which will be available through
* the host and port set when this application master registered itself to the resource manager.
*/
- final class ReportHandler extends SimpleChannelUpstreamHandler {
+ final class ReportHandler extends ChannelInboundHandlerAdapter {
private final ResourceReportAdapter reportAdapter;
ReportHandler() {
@@ -184,51 +168,68 @@ public final class TrackerService extends AbstractIdleService {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != HttpMethod.GET) {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
- response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.wrappedBuffer("Only GET is supported".getBytes(StandardCharsets.UTF_8)));
- writeResponse(e.getChannel(), response);
- return;
- }
-
- if (!PATH.equals(request.getUri())) {
- // Redirect all GET call to the /resources path.
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
- response.setHeader(HttpHeaders.Names.LOCATION, PATH);
- writeResponse(e.getChannel(), response);
- return;
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ if (!(msg instanceof HttpRequest)) {
+ // Ignore if it is not HttpRequest
+ return;
+ }
+
+ HttpRequest request = (HttpRequest) msg;
+ if (!HttpMethod.GET.equals(request.method())) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED,
+ Unpooled.copiedBuffer("Only GET is supported", StandardCharsets.UTF_8));
+
+ HttpUtil.setContentLength(response, response.content().readableBytes());
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+ writeAndClose(ctx.channel(), response);
+ return;
+ }
+
+ if (!PATH.equals(request.uri())) {
+ // Redirect all GET call to the /resources path.
+ HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+ HttpResponseStatus.TEMPORARY_REDIRECT);
+ HttpUtil.setContentLength(response, 0);
+ response.headers().set(HttpHeaderNames.LOCATION, PATH);
+ writeAndClose(ctx.channel(), response);
+ return;
+ }
+
+ writeResourceReport(ctx.channel());
+ } finally {
+ ReferenceCountUtil.release(msg);
}
+ }
- writeResourceReport(e.getChannel());
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ ctx.channel().close();
}
private void writeResourceReport(Channel channel) {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
-
- ChannelBuffer content = ChannelBuffers.dynamicBuffer();
- Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
- reportAdapter.toJson(resourceReport.get(), writer);
+ ByteBuf content = Unpooled.buffer();
+ Writer writer = new OutputStreamWriter(new ByteBufOutputStream(content), CharsetUtil.UTF_8);
try {
+ reportAdapter.toJson(resourceReport.get(), writer);
writer.close();
- } catch (IOException e1) {
- LOG.error("error writing resource report", e1);
+ } catch (IOException e) {
+ LOG.error("error writing resource report", e);
+ writeAndClose(channel, new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ Unpooled.copiedBuffer(e.getMessage(), StandardCharsets.UTF_8)));
+ return;
}
- response.setContent(content);
- writeResponse(channel, response);
- }
- private void writeResponse(Channel channel, HttpResponse response) {
- ChannelFuture future = channel.write(response);
- future.addListener(ChannelFutureListener.CLOSE);
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
+ HttpUtil.setContentLength(response, content.readableBytes());
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
+ channel.writeAndFlush(response);
}
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- e.getChannel().close();
+ private void writeAndClose(Channel channel, HttpResponse response) {
+ channel.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/6d6b3882/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
index fb8b7e8..4676751 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
@@ -17,19 +17,21 @@
*/
package org.apache.twill.yarn;
-import com.google.common.base.Charsets;
-import com.google.common.io.Closeables;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.internal.json.ResourceReportAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.net.HttpURLConnection;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.zip.DeflaterInputStream;
+import java.util.zip.GZIPInputStream;
/**
* Package private class to get {@link ResourceReport} from the application master.
@@ -52,12 +54,16 @@ final class ResourceReportClient {
public ResourceReport get() {
for (URL url : resourceUrls) {
try {
- Reader reader = new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8));
- try {
+ HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
+ urlConn.setRequestProperty("Accept-Encoding", "gzip, deflate");
+
+ if (urlConn.getResponseCode() != 200) {
+ continue;
+ }
+
+ try (Reader reader = new InputStreamReader(getInputStream(urlConn), StandardCharsets.UTF_8)) {
LOG.trace("Report returned by {}", url);
return reportAdapter.fromJson(reader);
- } finally {
- Closeables.closeQuietly(reader);
}
} catch (IOException e) {
// Just log a trace as it's ok to not able to fetch resource report
@@ -66,4 +72,20 @@ final class ResourceReportClient {
}
return null;
}
+
+ private InputStream getInputStream(HttpURLConnection urlConn) throws IOException {
+ InputStream is = urlConn.getInputStream();
+ String contentEncoding = urlConn.getContentEncoding();
+ if (contentEncoding == null) {
+ return is;
+ }
+ if ("gzip".equalsIgnoreCase(contentEncoding)) {
+ return new GZIPInputStream(is);
+ }
+ if ("deflate".equalsIgnoreCase(contentEncoding)) {
+ return new DeflaterInputStream(is);
+ }
+ // This should never happen
+ throw new IOException("Unsupported content encoding " + contentEncoding);
+ }
}
[04/15] twill git commit: (TWILL-251) Reduce log level of YarnNMClient
Posted by ch...@apache.org.
(TWILL-251) Reduce log level of YarnNMClient
- Also reduce the polling frequency
This closes #64 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/55f6d6fc
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/55f6d6fc
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/55f6d6fc
Branch: refs/heads/site
Commit: 55f6d6fc9b088e1fa9a1462a92146d608bbebb22
Parents: b7785bd
Author: Terence Yim <ch...@apache.org>
Authored: Fri Dec 1 13:21:07 2017 -0600
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Dec 4 10:21:05 2017 -0800
----------------------------------------------------------------------
.../twill/internal/yarn/Hadoop20YarnNMClient.java | 12 ++++++++----
.../twill/internal/yarn/Hadoop21YarnNMClient.java | 14 +++++++++-----
2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/55f6d6fc/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
index b43e4e1..e8628da 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
@@ -18,6 +18,7 @@
package org.apache.twill.internal.yarn;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
/**
*
@@ -102,14 +104,16 @@ public final class Hadoop20YarnNMClient implements YarnNMClient {
stopRequest.setContainerId(container.getId());
try {
manager.stopContainer(stopRequest);
- boolean completed = false;
- while (!completed) {
+ while (true) {
GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
statusRequest.setContainerId(container.getId());
GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
- LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
+ LOG.trace("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
- completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
+ if (statusResponse.getStatus().getState() == ContainerState.COMPLETE) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
LOG.info("Container {} stopped.", container.getId());
} catch (YarnRemoteException e) {
http://git-wip-us.apache.org/repos/asf/twill/blob/55f6d6fc/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
index 8c5f0fc..dcdeb70 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.yarn;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -29,6 +30,8 @@ import org.apache.twill.common.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
/**
* Wrapper class for {@link NMClient} for Hadoop version 2.1 or greater.
*/
@@ -82,12 +85,13 @@ public final class Hadoop21YarnNMClient extends AbstractIdleService implements Y
try {
nmClient.stopContainer(container.getId(), container.getNodeId());
- boolean completed = false;
- while (!completed) {
+ while (true) {
ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
- LOG.info("Container status: {} {}", status, status.getDiagnostics());
-
- completed = (status.getState() == ContainerState.COMPLETE);
+ LOG.trace("Container status: {} {}", status, status.getDiagnostics());
+ if (status.getState() == ContainerState.COMPLETE) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
LOG.info("Container {} stopped.", container.getId());
} catch (Exception e) {
[07/15] twill git commit: - Added hadoop-2.6 profile to travis - Also
runs the Java8 tests with hadoop-2.6
Posted by ch...@apache.org.
- Added hadoop-2.6 profile to travis
- Also runs the Java8 tests with hadoop-2.6
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/af60a021
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/af60a021
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/af60a021
Branch: refs/heads/site
Commit: af60a0215c43d1e8a0853b2ff4b65ed278f06290
Parents: 00a844a
Author: Terence Yim <ch...@apache.org>
Authored: Fri Mar 9 12:31:12 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Mar 9 12:31:12 2018 -0800
----------------------------------------------------------------------
.travis.yml | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/af60a021/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 93cf462..fae270d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,9 +41,10 @@ env:
- PROFILE='hadoop-2.2'
- PROFILE='hadoop-2.4'
- PROFILE='hadoop-2.5'
+ - PROFILE='hadoop-2.6'
- PROFILE='cdh-4.4.0'
- PROFILE='mapr-hadoop-2.4'
- - PROFILE='hadoop-2.5,java8-test'
+ - PROFILE='hadoop-2.6,java8-test'
# Only runs JDK8 on hadoop-2.5 profile
matrix:
@@ -59,11 +60,13 @@ matrix:
- jdk: oraclejdk8
env: PROFILE='hadoop-2.5'
- jdk: oraclejdk8
+ env: PROFILE='hadoop-2.6'
+ - jdk: oraclejdk8
env: PROFILE='cdh-4.4.0'
- jdk: oraclejdk8
env: PROFILE='mapr-hadoop-2.4'
- jdk: oraclejdk7
- env: PROFILE='hadoop-2.5,java8-test'
+ env: PROFILE='hadoop-2.6,java8-test'
sudo: false
[05/15] twill git commit: (TWILL-254) Update to use
ContainerId.fromString
Posted by ch...@apache.org.
(TWILL-254) Update to use ContainerId.fromString
This closes #65 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/d6095d48
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/d6095d48
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/d6095d48
Branch: refs/heads/site
Commit: d6095d4876d55c8e614a11bf66d780e0856481f8
Parents: 55f6d6f
Author: Clay Baenziger <cb...@bloomberg.net>
Authored: Fri Feb 2 22:35:37 2018 -0500
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Feb 5 16:59:50 2018 -0800
----------------------------------------------------------------------
pom.xml | 51 ++++++++++++++++++--
.../internal/yarn/Hadoop20YarnAMClient.java | 7 +++
.../internal/yarn/Hadoop21YarnAMClient.java | 7 +++
.../internal/yarn/Hadoop22YarnAMClient.java | 2 +-
.../internal/yarn/Hadoop26YarnAMClient.java | 42 ++++++++++++++++
.../internal/yarn/AbstractYarnAMClient.java | 12 +++--
.../yarn/VersionDetectYarnAMClientFactory.java | 10 +++-
.../apache/twill/internal/yarn/YarnUtils.java | 13 ++++-
8 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 573936d..f804fd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,7 +171,6 @@
<junit.version>4.11</junit.version>
<jopt-simple.version>3.2</jopt-simple.version>
<commons-compress.version>1.5</commons-compress.version>
- <hadoop.version>[2.0.2-alpha,2.3.0]</hadoop.version>
<hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
</properties>
@@ -538,9 +537,6 @@
<properties>
<hadoop.version>2.3.0</hadoop.version>
</properties>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
<build>
<plugins>
<plugin>
@@ -723,6 +719,53 @@
</build>
</profile>
<profile>
+ <id>hadoop-2.6</id>
+ <properties>
+ <hadoop.version>2.6.5</hadoop.version>
+ </properties>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/hadoop21</source>
+ <source>src/main/hadoop22</source>
+ <source>src/main/hadoop23</source>
+ <source>src/main/hadoop26</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-2.0</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/hadoop20</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
<id>java8-test</id>
<modules>
<module>twill-java8-test</module>
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index a990cc0..67bef3e 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -26,11 +26,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.apache.twill.internal.yarn.ports.AMRMClient;
import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
@@ -72,6 +74,11 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ protected ContainerId containerIdLookup(String containerIdStr) {
+ return (ConverterUtils.toContainerId(containerIdStr));
+ }
+
+ @Override
protected void startUp() throws Exception {
Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 82f428a..f349b4e 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -27,11 +27,13 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +59,11 @@ public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.Contai
};
}
+ @Override
+ protected ContainerId containerIdLookup(String containerIdStr) {
+ return (ConverterUtils.toContainerId(containerIdStr));
+ }
+
protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
protected final Hadoop21YarnNMClient nmClient;
protected Resource maxCapability;
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
index 3ee99e8..c1bb5fd 100644
--- a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
@@ -26,7 +26,7 @@ import java.util.List;
/**
* Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
*/
-public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
+public class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java
new file mode 100644
index 0000000..372c422
--- /dev/null
+++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAMClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Wrapper class for AMRMClient for Hadoop version 2.6 or greater.
+ */
+public final class Hadoop26YarnAMClient extends Hadoop22YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop26YarnAMClient.class);
+
+ public Hadoop26YarnAMClient(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected final ContainerId containerIdLookup(String containerIdStr) {
+ return (ContainerId.fromString(containerIdStr));
+ }
+}
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
index e8f3b2b..b4b50cd 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.slf4j.Logger;
@@ -80,7 +79,7 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
String masterContainerId = System.getenv().get(containerIdEnvName);
Preconditions.checkArgument(masterContainerId != null,
"Missing %s from environment", containerIdEnvName);
- this.containerId = ConverterUtils.toContainerId(masterContainerId);
+ this.containerId = containerIdLookup(masterContainerId);
this.inflightRequests = ArrayListMultimap.create();
this.pendingRequests = ArrayListMultimap.create();
this.pendingRemoves = Lists.newLinkedList();
@@ -89,7 +88,6 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
this.blacklistedResources = Lists.newArrayList();
}
-
@Override
public final ContainerId getContainerId() {
return containerId;
@@ -227,6 +225,14 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
}
/**
+ * Returns the ContainerId given a container ID string
+ *
+ * @param containerIdStr the container ID string to lookup
+ * @return A {@link ContainerId} instance representing the result.
+ */
+ protected abstract ContainerId containerIdLookup(String containerIdStr);
+
+ /**
* Adjusts the given resource capability to fit in the cluster limit.
*
* @param capability The capability to be adjusted.
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
index 943efb8..c6ab02b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -48,11 +48,17 @@ public final class VersionDetectYarnAMClientFactory implements YarnAMClientFacto
clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
clz = (Class<YarnAMClient>) Class.forName(clzName);
break;
- default:
- // Uses hadoop-2.2 or above class
+ case HADOOP_22:
+ case HADOOP_23:
+ // Uses hadoop-2.2 class
clzName = getClass().getPackage().getName() + ".Hadoop22YarnAMClient";
clz = (Class<YarnAMClient>) Class.forName(clzName);
break;
+ default:
+ // Uses hadoop-2.6 or above class
+ clzName = getClass().getPackage().getName() + ".Hadoop26YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ break;
}
return clz.getConstructor(Configuration.class).newInstance(conf);
http://git-wip-us.apache.org/repos/asf/twill/blob/d6095d48/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index c0aeb0c..d7e6eb0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -68,7 +68,8 @@ public class YarnUtils {
HADOOP_20,
HADOOP_21,
HADOOP_22,
- HADOOP_23
+ HADOOP_23,
+ HADOOP_26
}
private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
@@ -263,7 +264,15 @@ public class YarnUtils {
Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
try {
Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
- HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
+ try {
+ Class[] args = new Class[1];
+ args[0] = String.class;
+ // see if we have a org.apache.hadoop.yarn.api.records.ContainerId.fromString() method
+ Class.forName("org.apache.hadoop.yarn.api.records.ContainerId").getMethod("fromString", args);
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_26);
+ } catch (NoSuchMethodException e) {
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
+ }
} catch (ClassNotFoundException e) {
HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
}
[03/15] twill git commit: (TWILL-248) Speedup shutdown of tracker
service
Posted by ch...@apache.org.
(TWILL-248) Speedup shutdown of tracker service
This closes #63 on Github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/b7785bde
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/b7785bde
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/b7785bde
Branch: refs/heads/site
Commit: b7785bde4e7e990072f803d89353e37f26ed8af5
Parents: aa70499
Author: Terence Yim <ch...@apache.org>
Authored: Mon Oct 30 15:10:34 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Oct 31 10:52:16 2017 -0700
----------------------------------------------------------------------
.../internal/appmaster/TrackerService.java | 26 ++++++++++++++++----
1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/b7785bde/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index bb8cf57..10de10c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -31,6 +31,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -48,6 +50,8 @@ import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.internal.json.ResourceReportAdapter;
import org.slf4j.Logger;
@@ -60,6 +64,8 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -82,7 +88,7 @@ public final class TrackerService extends AbstractIdleService {
private String host;
private ServerBootstrap bootstrap;
- private Channel serverChannel;
+ private ChannelGroup channelGroup;
private InetSocketAddress bindAddress;
private URL url;
@@ -119,6 +125,7 @@ public final class TrackerService extends AbstractIdleService {
@Override
protected void startUp() throws Exception {
+ channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
EventLoopGroup bossGroup = new NioEventLoopGroup(NUM_BOSS_THREADS,
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("boss-thread").build());
@@ -132,6 +139,7 @@ public final class TrackerService extends AbstractIdleService {
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
+ channelGroup.add(ch);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec", new HttpServerCodec());
pipeline.addLast("compressor", new HttpContentCompressor());
@@ -140,7 +148,9 @@ public final class TrackerService extends AbstractIdleService {
}
});
- serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+ Channel serverChannel = bootstrap.bind(new InetSocketAddress(host, 0)).sync().channel();
+ channelGroup.add(serverChannel);
+
bindAddress = (InetSocketAddress) serverChannel.localAddress();
url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
@@ -149,9 +159,15 @@ public final class TrackerService extends AbstractIdleService {
@Override
protected void shutDown() throws Exception {
- serverChannel.close().await();
- bootstrap.config().group().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
- bootstrap.config().childGroup().shutdownGracefully(1, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS).await();
+ channelGroup.close().awaitUninterruptibly();
+
+ List<Future<?>> futures = new ArrayList<>();
+ futures.add(bootstrap.config().group().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS));
+ futures.add(bootstrap.config().childGroup().shutdownGracefully(0, CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS));
+
+ for (Future<?> future : futures) {
+ future.awaitUninterruptibly();
+ }
LOG.info("Tracker service stopped at {}", url);
}
[09/15] twill git commit: Add Yuliya Feldman to committer
Posted by ch...@apache.org.
Add Yuliya Feldman to committer
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/439c1096
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/439c1096
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/439c1096
Branch: refs/heads/site
Commit: 439c1096df09939e3f13749722746b767f4ca753
Parents: 7f494d1
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 12 18:49:49 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 12 18:49:49 2018 -0700
----------------------------------------------------------------------
pom.xml | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/439c1096/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f804fd7..84c4b8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,14 @@
<role>Committer</role>
</roles>
</developer>
+ <developer>
+ <id>yufeldman</id>
+ <name>Yuliya Feldman</name>
+ <email>yufeldman@apache.org</email>
+ <roles>
+ <role>Committer</role>
+ </roles>
+ </developer>
</developers>
<modules>
[13/15] twill git commit: Run mvn in --batch-mode to reduce verbosity
in travis build
Posted by ch...@apache.org.
Run mvn in --batch-mode to reduce verbosity in travis build
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/8f70aa4d
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/8f70aa4d
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/8f70aa4d
Branch: refs/heads/site
Commit: 8f70aa4d49243ef2a0ddc613ce2f3f9c22f80e97
Parents: 107dc1e
Author: Terence Yim <ch...@apache.org>
Authored: Sun Mar 18 22:21:34 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sun Mar 18 22:21:34 2018 -0700
----------------------------------------------------------------------
.travis.yml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/8f70aa4d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index fae270d..55101b7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,9 +31,9 @@ branches:
- /^branch\-.*$/
- /^feature\/.*$/
-script: mvn test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
+script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
-install: mvn install -P $PROFILE -DskipTests=true
+install: mvn --batch-mode install -P $PROFILE -DskipTests=true
env:
- PROFILE='hadoop-2.0'
[06/15] twill git commit: (TWILL-255) Incorrect logging after memory
was adjusted. Does not show memory before adjustment
Posted by ch...@apache.org.
(TWILL-255) Incorrect logging after memory was adjusted. Does not show memory before adjustment
This closes #66 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/00a844ad
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/00a844ad
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/00a844ad
Branch: refs/heads/site
Commit: 00a844adedd2b86d3c2ea55a58a4a743c1724aaf
Parents: d6095d4
Author: Yuliya Feldman <yu...@dremio.com>
Authored: Wed Feb 28 14:52:56 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Mar 1 10:07:14 2018 -0800
----------------------------------------------------------------------
.../org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java | 2 +-
.../org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/00a844ad/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 67bef3e..76de0c0 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -126,8 +126,8 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
if (resource.getMemory() != updatedMemory) {
- resource.setMemory(updatedMemory);
LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+ resource.setMemory(updatedMemory);
}
return resource;
http://git-wip-us.apache.org/repos/asf/twill/blob/00a844ad/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index f349b4e..42bff62 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -165,8 +165,8 @@ public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.Contai
int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
if (resource.getMemory() != updatedMemory) {
- resource.setMemory(updatedMemory);
LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+ resource.setMemory(updatedMemory);
}
return resource;
[14/15] twill git commit: (TWILL-258) Use loopback address for ZK
server. Also fixes some race conditions in unit tests
Posted by ch...@apache.org.
(TWILL-258) Use loopback address for ZK server. Also fixes some race conditions in unit tests
- Fix a race condition in the LocationCacheTest
- There is a small delay in the current timestamp in the
LocationCacheCleaner.start and the one in the test method.
- Fix race condition in LogLevelChangeTestRun
- The test assumes after the root logger level changed, the other logger levels also changed
in the resource report, which is not true
- The test is not checking the log levels for all runnnable instances
This closes #68 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/ee4d1370
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/ee4d1370
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/ee4d1370
Branch: refs/heads/site
Commit: ee4d13701b218305d034bfaa8474ef881995e65c
Parents: 8f70aa4
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 19 00:28:20 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Mar 20 12:54:58 2018 -0700
----------------------------------------------------------------------
.travis.yml | 2 +-
.../apache/twill/yarn/LocationCacheCleaner.java | 4 +-
.../apache/twill/yarn/LocationCacheTest.java | 11 +++-
.../twill/yarn/LogLevelChangeTestRun.java | 59 +++++++++++---------
.../internal/zookeeper/InMemoryZKServer.java | 18 ++----
5 files changed, 51 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 55101b7..af74548 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,7 +31,7 @@ branches:
- /^branch\-.*$/
- /^feature\/.*$/
-script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
+script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false
install: mvn --batch-mode install -P $PROFILE -DskipTests=true
http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
index 0738218..fed76a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -144,7 +144,9 @@ final class LocationCacheCleaner extends AbstractIdleService {
}
// If the location is already pending for cleanup, this won't update the expire time as
// the comparison of PendingCleanup is only by location.
- pendingCleanups.add(new PendingCleanup(location, expireTime));
+ if (pendingCleanups.add(new PendingCleanup(location, expireTime))) {
+ LOG.debug("Pending deletion of location {} with expiration time at {}", location, expireTime);
+ }
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
index 8ed9ae4..63ca28b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -109,12 +109,17 @@ public class LocationCacheTest {
newTwillRunner.start();
// Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
- // collected when the new twill runner was started
+ // collected when the new twill runner was started.
+ // Need to add some time in addition to the antique expiry time because the cache cleaner collects
+ // pending list asynchronously, which the "current" time it uses to calculate the expiration time might be
+ // later than the System.currentTimeMillis() call in the next line.
((YarnTwillRunnerService) newTwillRunner)
- .forceLocationCacheCleanup(System.currentTimeMillis() + Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+ .forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
+ Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
// Now there shouldn't be any file under the current session cache directory
- Assert.assertTrue(currentSessionCache.list().isEmpty());
+ List<Location> locations = currentSessionCache.list();
+ Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
}
/**
http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
index 6df6d11..a1d8ae6 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -172,20 +173,20 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
// assert that log level is DEBUG
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
// change the log level to INFO
controller.updateLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, LogEntry.Level.INFO)).get();
// assert log level has changed to INFO
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
// change the log level of LogLevelTestRunnable to WARN,
// change the log level of LogLevelTestSecondRunnable to TRACE
@@ -195,16 +196,16 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
controller.updateLogLevels(LogLevelTestSecondRunnable.class.getSimpleName(), logLevelSecondRunnable).get();
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN), 1);
waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE), 1);
// change a particular logger to log level warn and reset it back.
logLevelFirstRunnable = ImmutableMap.of("test", LogEntry.Level.WARN);
controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
20L, TimeUnit.SECONDS, LogEntry.Level.WARN,
- ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN));
+ ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN), 1);
logLevelFirstRunnable = new HashMap<>();
logLevelFirstRunnable.put("test", null);
controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
@@ -212,13 +213,13 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
result.put("ROOT", LogEntry.Level.WARN);
result.put("test", null);
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+ 20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
// reset the log level for a particular logger of LogLevelTestRunnable
controller.resetRunnableLogLevels(LogLevelTestRunnable.class.getSimpleName(), "test").get();
result.remove("test");
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+ 20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
// change the log level of LogLevelTestSecondRunnable to INFO and change instances of it to test if the log level
// request get applied to container started up later
@@ -228,14 +229,14 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
controller.changeInstances(LogLevelTestSecondRunnable.class.getSimpleName(), 2).get();
TimeUnit.SECONDS.sleep(5);
waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(), 20L, TimeUnit.SECONDS,
- LogEntry.Level.INFO, logLevelSecondRunnable);
+ LogEntry.Level.INFO, logLevelSecondRunnable, 2);
// reset the log levels back to default.
controller.resetLogLevels().get();
waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
- 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+ 20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 2);
// stop
controller.terminate().get(120, TimeUnit.SECONDS);
@@ -248,29 +249,37 @@ public class LogLevelChangeTestRun extends BaseYarnTest {
// could return null if the application has not fully started.
private void waitForLogLevel(TwillController controller, String runnable, long timeout,
TimeUnit timeoutUnit, LogEntry.Level expected,
- Map<String, LogEntry.Level> expectedArgs) throws InterruptedException {
+ Map<String, LogEntry.Level> expectedArgs,
+ int expectedInstances) throws InterruptedException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
- LogEntry.Level actual = null;
- Map<String, LogEntry.Level> actualArgs = null;
- boolean stopped = false;
- do {
+ while (stopwatch.elapsedTime(timeoutUnit) < timeout) {
ResourceReport report = controller.getResourceReport();
+
if (report == null || report.getRunnableResources(runnable) == null) {
+ TimeUnit.MILLISECONDS.sleep(100);
continue;
}
+
+ int matchCount = 0;
for (TwillRunResources resources : report.getRunnableResources(runnable)) {
- actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
- actualArgs = resources.getLogLevels();
- if (actual != null && actual.equals(expected)) {
- stopped = true;
- break;
+ LogEntry.Level actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
+ Map<String, LogEntry.Level> actualArgs = resources.getLogLevels();
+ if (Objects.equals(expected, actual) && Objects.equals(expectedArgs, actualArgs)) {
+ matchCount++;
+ } else {
+ LOG.info("Log levels not match for {}. {} != {} or {} != {}",
+ runnable, expected, actual, expectedArgs, actualArgs);
}
}
+
+ if (matchCount == expectedInstances) {
+ return;
+ }
TimeUnit.MILLISECONDS.sleep(100);
- } while (!stopped && stopwatch.elapsedTime(timeoutUnit) < timeout);
- Assert.assertEquals(expected, actual);
- Assert.assertEquals(expectedArgs, actualArgs);
+ }
+
+ Assert.fail("Timeout waiting for expected log levels");
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/ee4d1370/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
index f962b68..d18d5ed 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
@@ -18,7 +18,6 @@
package org.apache.twill.internal.zookeeper;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.concurrent.Executor;
/**
@@ -103,17 +101,11 @@ public final class InMemoryZKServer implements Service {
}
private InetSocketAddress getAddress(int port) {
- try {
- int socketPort = port < 0 ? 0 : port;
- // This property is needed so that in certain CI environment (e.g. Travis-CI) it can only works properly if
- // it is binded to the wildcard (0.0.0.0) address
- if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
- return new InetSocketAddress(InetAddress.getLocalHost(), socketPort);
- } else {
- return new InetSocketAddress(socketPort);
- }
- } catch (UnknownHostException e) {
- throw Throwables.propagate(e);
+ int socketPort = port < 0 ? 0 : port;
+ if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
+ return new InetSocketAddress(InetAddress.getLoopbackAddress(), socketPort);
+ } else {
+ return new InetSocketAddress(socketPort);
}
}
[08/15] twill git commit: Added the missing hadoop-2.6 profile in the
twill-yarn module
Posted by ch...@apache.org.
Added the missing hadoop-2.6 profile in the twill-yarn module
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/7f494d13
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/7f494d13
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/7f494d13
Branch: refs/heads/site
Commit: 7f494d13da45e6f9a27dfe1b4bde9d521d306c20
Parents: af60a02
Author: Terence Yim <ch...@apache.org>
Authored: Fri Mar 9 13:55:05 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Mar 9 13:55:05 2018 -0800
----------------------------------------------------------------------
twill-yarn/pom.xml | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/7f494d13/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index 9e99886..ae68a2e 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -178,6 +178,19 @@
</build>
</profile>
<profile>
+ <id>hadoop-2.6</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ <profile>
<id>mapr-hadoop-2.4</id>
<dependencies>
<dependency>
[11/15] twill git commit: Update ReleaseGuide to include latest
version of Hadoop
Posted by ch...@apache.org.
Update ReleaseGuide to include latest version of Hadoop
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/4b0cf5e7
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/4b0cf5e7
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/4b0cf5e7
Branch: refs/heads/site
Commit: 4b0cf5e7164dd12ad9d73b9420f6bc4e7a5667a1
Parents: 1c8fec8
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 12 18:50:26 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 12 18:50:26 2018 -0700
----------------------------------------------------------------------
src/site/markdown/ReleaseGuide.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/4b0cf5e7/src/site/markdown/ReleaseGuide.md
----------------------------------------------------------------------
diff --git a/src/site/markdown/ReleaseGuide.md b/src/site/markdown/ReleaseGuide.md
index 551d48d..0fd59a1 100644
--- a/src/site/markdown/ReleaseGuide.md
+++ b/src/site/markdown/ReleaseGuide.md
@@ -82,8 +82,8 @@ git push origin v${RELEASE_VERSION}
#### Build the source tarball and publish artifacts to the staging repo
```
mvn clean prepare-package -DskipTests -Dremoteresources.skip=true -P hadoop-2.0 &&
-mvn prepare-package -DskipTests -Dremoteresources.skip=true -P hadoop-2.3 &&
-mvn deploy -DskipTests -Dremoteresources.skip=true -P hadoop-2.3 -P apache-release
+mvn prepare-package -DskipTests -Dremoteresources.skip=true -P hadoop-2.6 &&
+mvn deploy -DskipTests -Dremoteresources.skip=true -P hadoop-2.6 -P apache-release
```
The source tarball can be found in `target/apache-twill-${RELEASE_VERSION}-source-release.tar.gz`
after the above command has successfully completed.
[15/15] twill git commit: Merge branch 'master' into site
Posted by ch...@apache.org.
Merge branch 'master' into site
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/2a97588c
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/2a97588c
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/2a97588c
Branch: refs/heads/site
Commit: 2a97588ccf78f6782682635bab8a147f8b930f80
Parents: 4b0cf5e ee4d137
Author: Terence Yim <ch...@apache.org>
Authored: Wed Mar 21 22:19:38 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Mar 21 22:19:38 2018 -0700
----------------------------------------------------------------------
.travis.yml | 4 +-
pom.xml | 14 ++
.../main/java/org/apache/twill/api/Configs.java | 12 ++
.../twill/internal/AbstractTwillController.java | 18 ++
.../twill/internal/AbstractTwillService.java | 14 +-
.../kafka/client/SimpleKafkaConsumer.java | 19 +-
.../kafka/client/SimpleKafkaPublisher.java | 41 ++--
.../internal/kafka/client/ZKBrokerService.java | 39 ++--
.../internal/yarn/Hadoop21YarnAppClient.java | 17 +-
.../internal/yarn/Hadoop23YarnAppClient.java | 4 +-
.../internal/yarn/Hadoop26YarnAppClient.java | 48 +++++
.../appmaster/ApplicationMasterMain.java | 7 +-
.../appmaster/ApplicationMasterService.java | 10 +-
.../yarn/VersionDetectYarnAppClientFactory.java | 6 +-
.../apache/twill/yarn/LocationCacheCleaner.java | 4 +-
.../apache/twill/yarn/YarnTwillController.java | 11 ++
.../apache/twill/yarn/AppRecoveryTestRun.java | 189 +++++++++++++++++++
.../apache/twill/yarn/LocationCacheTest.java | 11 +-
.../twill/yarn/LogLevelChangeTestRun.java | 59 +++---
.../internal/zookeeper/InMemoryZKServer.java | 18 +-
.../apache/twill/zookeeper/ZKOperations.java | 72 +++++++
21 files changed, 519 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/2a97588c/pom.xml
----------------------------------------------------------------------
[02/15] twill git commit: Extra commit to close PR
Posted by ch...@apache.org.
Extra commit to close PR
This closes #62 on Github
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/aa70499e
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/aa70499e
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/aa70499e
Branch: refs/heads/site
Commit: aa70499ea083a783cda4daf3261aec0383fb1aa6
Parents: 6d6b388
Author: Terence Yim <ch...@apache.org>
Authored: Mon Oct 30 15:25:44 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Oct 30 15:25:44 2017 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------
[10/15] twill git commit: Merge branch 'master' into site
Posted by ch...@apache.org.
Merge branch 'master' into site
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/1c8fec89
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/1c8fec89
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/1c8fec89
Branch: refs/heads/site
Commit: 1c8fec89207fe2875c688acbb8285e7acf848907
Parents: 49f3595 439c109
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 12 18:50:02 2018 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Mar 12 18:50:02 2018 -0700
----------------------------------------------------------------------
.travis.yml | 7 +-
pom.xml | 73 +++++-
twill-core/pom.xml | 10 +-
twill-yarn/pom.xml | 13 ++
.../internal/yarn/Hadoop20YarnAMClient.java | 9 +-
.../internal/yarn/Hadoop20YarnNMClient.java | 12 +-
.../internal/yarn/Hadoop21YarnAMClient.java | 9 +-
.../internal/yarn/Hadoop21YarnNMClient.java | 14 +-
.../internal/yarn/Hadoop22YarnAMClient.java | 2 +-
.../internal/yarn/Hadoop26YarnAMClient.java | 42 ++++
.../internal/appmaster/TrackerService.java | 231 ++++++++++---------
.../internal/yarn/AbstractYarnAMClient.java | 12 +-
.../yarn/VersionDetectYarnAMClientFactory.java | 10 +-
.../apache/twill/internal/yarn/YarnUtils.java | 13 +-
.../apache/twill/yarn/ResourceReportClient.java | 36 ++-
15 files changed, 351 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/1c8fec89/pom.xml
----------------------------------------------------------------------
[12/15] twill git commit: (TWILL-61) Fix to allow higher attempts to
relaunch the app after the first attempt failed
Posted by ch...@apache.org.
(TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed
- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
- When AM killed and restarted, the embedded Kafka will be running in different host and port
This closes #67 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/107dc1e2
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/107dc1e2
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/107dc1e2
Branch: refs/heads/site
Commit: 107dc1e20c63207695bb5f8b5f97186b6b3f9412
Parents: 439c109
Author: Terence Yim <ch...@apache.org>
Authored: Fri Mar 9 12:21:26 2018 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Mar 15 16:13:34 2018 -0700
----------------------------------------------------------------------
pom.xml | 14 ++
.../main/java/org/apache/twill/api/Configs.java | 12 ++
.../twill/internal/AbstractTwillController.java | 18 ++
.../twill/internal/AbstractTwillService.java | 14 +-
.../kafka/client/SimpleKafkaConsumer.java | 19 +-
.../kafka/client/SimpleKafkaPublisher.java | 41 ++--
.../internal/kafka/client/ZKBrokerService.java | 39 ++--
.../internal/yarn/Hadoop21YarnAppClient.java | 17 +-
.../internal/yarn/Hadoop23YarnAppClient.java | 4 +-
.../internal/yarn/Hadoop26YarnAppClient.java | 48 +++++
.../appmaster/ApplicationMasterMain.java | 7 +-
.../appmaster/ApplicationMasterService.java | 10 +-
.../yarn/VersionDetectYarnAppClientFactory.java | 6 +-
.../apache/twill/yarn/YarnTwillController.java | 11 ++
.../apache/twill/yarn/AppRecoveryTestRun.java | 189 +++++++++++++++++++
.../apache/twill/zookeeper/ZKOperations.java | 72 +++++++
16 files changed, 467 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 84c4b8c..45aa64d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,7 @@
<jopt-simple.version>3.2</jopt-simple.version>
<commons-compress.version>1.5</commons-compress.version>
<hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
+ <force.mac.tests>false</force.mac.tests>
</properties>
<scm>
@@ -346,6 +347,7 @@
<redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+ <force.mac.tests>${force.mac.tests}</force.mac.tests>
</systemPropertyVariables>
<reuseForks>false</reuseForks>
<reportFormat>plain</reportFormat>
@@ -362,6 +364,18 @@
<profiles>
<profile>
+ <!--
+ This profile is to force certain tests to run on Mac.
+ Those tests are disabled due to orphan processes left after the test run (HADOOP-12317).
+ If this profile is enabled, after the test finished, run the `jps` command
+ and delete all `TwillLauncher` processes
+ -->
+ <id>force-mac-tests</id>
+ <properties>
+ <force.mac.tests>true</force.mac.tests>
+ </properties>
+ </profile>
+ <profile>
<id>apache-release</id>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-api/src/main/java/org/apache/twill/api/Configs.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
index 9a21489..20a25f6 100644
--- a/twill-api/src/main/java/org/apache/twill/api/Configs.java
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -79,6 +79,18 @@ public final class Configs {
public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb";
/**
+ * Maximum number of attempts to run the application by YARN if there is failure.
+ */
+ public static final String YARN_MAX_APP_ATTEMPTS = "twill.yarn.max.app.attempts";
+
+ /**
+ * Interval time in milliseconds for the attempt failures validity interval in YARN. YARN only limit to
+ * the maximum attempt count for failures in the given interval.
+ */
+ public static final String YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL =
+ "twill.yarn.attempt.failures.validity.interval";
+
+ /**
* Setting for enabling log collection.
*/
public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index fd8a939..0ff2fc8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -222,6 +222,24 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames);
}
+ /**
+ * Reset the log handler to poll from the beginning of Kafka.
+ */
+ protected final synchronized void resetLogHandler() {
+ if (kafkaClient == null) {
+ return;
+ }
+ if (logCancellable != null) {
+ logCancellable.cancel();
+ logCancellable = null;
+ }
+ if (!logHandlers.isEmpty()) {
+ logCancellable = kafkaClient.getConsumer().prepare()
+ .addFromBeginning(Constants.LOG_TOPIC, 0)
+ .consume(new LogMessageCallback(logHandlers));
+ }
+ }
+
private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
ResourceReport resourceReport = getResourceReport();
if (resourceReport == null) {
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 8e73653..425cd43 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -206,7 +206,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
}
/**
- * Update the live node for the runnable.
+ * Update the live node for the service.
*
* @return A {@link OperationFuture} that will be completed when the update is done.
*/
@@ -216,11 +216,15 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
return zkClient.setData(liveNodePath, serializeLiveNode());
}
+ /**
+ * Creates the live node for the service. If the node already exists, it will be deleted before creation.
+ *
+ * @return A {@link OperationFuture} that will be completed when the creation is done.
+ */
private OperationFuture<String> createLiveNode() {
- String liveNodePath = getLiveNodePath();
- LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
- return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
- KeeperException.NodeExistsException.class, liveNodePath);
+ final String liveNodePath = getLiveNodePath();
+ LOG.info("Creating live node {}{}", zkClient.getConnectString(), liveNodePath);
+ return ZKOperations.createDeleteIfExists(zkClient, liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL, true);
}
private OperationFuture<String> removeLiveNode() {
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 73235b7..f69350e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -340,17 +340,17 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
continue;
}
- // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
- // from kafak server.
- long off = offset.get();
- if (off < 0) {
- offset.set(getLastOffset(topicPart, off));
- }
+ try {
+ // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
+ // from kafak server.
+ long off = offset.get();
+ if (off < 0) {
+ offset.set(getLastOffset(topicPart, off));
+ }
- SimpleConsumer consumer = consumerEntry.getValue();
+ SimpleConsumer consumer = consumerEntry.getValue();
- // Fire a fetch message request
- try {
+ // Fire a fetch message request
FetchResponse response = fetchMessages(consumer, offset.get());
// Failure response, set consumer entry to null and let next round of loop to handle it.
@@ -364,6 +364,7 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
+ backoff.backoff();
continue;
}
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index f147d24..e5d0f8d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -54,11 +54,11 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
private final AtomicReference<Producer<Integer, ByteBuffer>> producer;
private final AtomicBoolean listenerCancelled;
- public SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
+ SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
this.brokerService = brokerService;
this.ack = ack;
this.compression = compression;
- this.producer = new AtomicReference<Producer<Integer, ByteBuffer>>();
+ this.producer = new AtomicReference<>();
this.listenerCancelled = new AtomicBoolean(false);
}
@@ -107,7 +107,7 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
@Override
public Preparer add(ByteBuffer message, Object partitionKey) {
- messages.add(new KeyedMessage<Integer, ByteBuffer>(topic, Math.abs(partitionKey.hashCode()), message));
+ messages.add(new KeyedMessage<>(topic, Math.abs(partitionKey.hashCode()), message));
return this;
}
@@ -159,30 +159,37 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
}
String newBrokerList = brokerService.getBrokerList();
- if (newBrokerList.isEmpty()) {
- LOG.warn("Broker list is empty. No Kafka producer is created.");
- return;
- }
+ // If there is no change, whether it is empty or not, just return
if (Objects.equal(brokerList, newBrokerList)) {
return;
}
- Properties props = new Properties();
- props.put("metadata.broker.list", newBrokerList);
- props.put("serializer.class", ByteBufferEncoder.class.getName());
- props.put("key.serializer.class", IntegerEncoder.class.getName());
- props.put("partitioner.class", IntegerPartitioner.class.getName());
- props.put("request.required.acks", Integer.toString(ack.getAck()));
- props.put("compression.codec", compression.getCodec());
+ Producer<Integer, ByteBuffer> newProducer = null;
+ if (!newBrokerList.isEmpty()) {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", newBrokerList);
+ props.put("serializer.class", ByteBufferEncoder.class.getName());
+ props.put("key.serializer.class", IntegerEncoder.class.getName());
+ props.put("partitioner.class", IntegerPartitioner.class.getName());
+ props.put("request.required.acks", Integer.toString(ack.getAck()));
+ props.put("compression.codec", compression.getCodec());
+
+ ProducerConfig config = new ProducerConfig(props);
+ newProducer = new Producer<>(config);
+ }
- ProducerConfig config = new ProducerConfig(props);
- Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config));
+ // If the broker list is empty, the producer will be set to null
+ Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer);
if (oldProducer != null) {
oldProducer.close();
}
- LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+ if (newBrokerList.isEmpty()) {
+ LOG.warn("Empty Kafka producer broker list, publish will fail.");
+ } else {
+ LOG.info("Updated Kafka producer broker list: {}", newBrokerList);
+ }
brokerList = newBrokerList;
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
index 2ffc604..de42b9b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
@@ -51,6 +51,8 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -58,6 +60,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
/**
* A {@link BrokerService} that watches kafka zk nodes for updates of broker lists and leader for
@@ -136,11 +139,13 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
return brokerList.get();
}
- final SettableFuture<?> readerFuture = SettableFuture.create();
- final AtomicReference<Iterable<BrokerInfo>> brokers =
- new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of());
+ final SettableFuture<?> readyFuture = SettableFuture.create();
+ final AtomicReference<List<BrokerInfo>> brokers = new AtomicReference<>(Collections.<BrokerInfo>emptyList());
actOnExists(BROKER_IDS_PATH, new Runnable() {
+
+ final Runnable thisRunnable = this;
+
@Override
public void run() {
// Callback for fetching children list. This callback should be executed in the executorService.
@@ -154,19 +159,19 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
Iterables.transform(
brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(),
Suppliers.<BrokerInfo>supplierFunction())));
- readerFuture.set(null);
+ readyFuture.set(null);
for (ListenerExecutor listener : listeners) {
listener.changed(ZKBrokerService.this);
}
} catch (ExecutionException e) {
- readerFuture.setException(e.getCause());
+ readyFuture.setException(e.getCause());
}
}
@Override
public void onFailure(Throwable t) {
- readerFuture.setException(t);
+ readyFuture.setException(t);
}
};
@@ -179,15 +184,25 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
}
if (event.getType() == Event.EventType.NodeChildrenChanged) {
Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService);
+ } else if (event.getType() == Event.EventType.NodeDeleted) {
+ // If the ids node is deleted, clear the broker list and re-watch.
+ // This could happen when the Kafka server is restarted and have the ZK node cleanup
+ // The readyFuture for this call doesn't matter, as we don't need to block on anything
+ brokers.set(Collections.<BrokerInfo>emptyList());
+ for (ListenerExecutor listener : listeners) {
+ listener.changed(ZKBrokerService.this);
+ }
+ actOnExists(BROKER_IDS_PATH, thisRunnable, SettableFuture.create(),
+ FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
}
}
}), childrenCallback, executorService);
}
- }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
+ }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
- brokerList = createSupplier(brokers);
+ brokerList = this.<Iterable<BrokerInfo>>createSupplier(brokers);
try {
- readerFuture.get();
+ readyFuture.get();
} catch (Exception e) {
throw Throwables.propagate(e);
}
@@ -223,7 +238,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
public Supplier<T> load(final K key) throws Exception {
// A future to tell if the result is ready, even it is failure.
final SettableFuture<T> readyFuture = SettableFuture.create();
- final AtomicReference<T> resultValue = new AtomicReference<T>();
+ final AtomicReference<T> resultValue = new AtomicReference<>();
// Fetch for node data when it exists.
final String path = key.getPath();
@@ -312,7 +327,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
}
}), new FutureCallback<Stat>() {
@Override
- public void onSuccess(Stat result) {
+ public void onSuccess(@Nullable Stat result) {
if (result != null) {
action.run();
} else {
@@ -345,7 +360,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker
/**
* Creates a supplier that always return latest copy from an {@link java.util.concurrent.atomic.AtomicReference}.
*/
- private <T> Supplier<T> createSupplier(final AtomicReference<T> ref) {
+ private <T> Supplier<T> createSupplier(final AtomicReference<? extends T> ref) {
return new Supplier<T>() {
@Override
public T get() {
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index aa14a75..c219171 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -59,7 +59,7 @@ import javax.annotation.Nullable;
public class Hadoop21YarnAppClient implements YarnAppClient {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
- private final Configuration configuration;
+ protected final Configuration configuration;
public Hadoop21YarnAppClient(Configuration configuration) {
this.configuration = configuration;
@@ -108,7 +108,7 @@ public class Hadoop21YarnAppClient implements YarnAppClient {
addRMToken(launchContext, yarnClient, appId);
appSubmissionContext.setAMContainerSpec(launchContext);
appSubmissionContext.setResource(capability);
- appSubmissionContext.setMaxAppAttempts(2);
+ configureAppSubmissionContext(appSubmissionContext);
yarnClient.submitApplication(appSubmissionContext);
return new ProcessControllerImpl(appId);
@@ -126,6 +126,19 @@ public class Hadoop21YarnAppClient implements YarnAppClient {
}
}
+ /**
+ * Updates the {@link ApplicationSubmissionContext} based on configuration.
+ */
+ protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+ int maxAttempts = configuration.getInt(Configs.Keys.YARN_MAX_APP_ATTEMPTS, -1);
+ if (maxAttempts > 0) {
+ context.setMaxAppAttempts(maxAttempts);
+ } else {
+ // Preserve the old behavior
+ context.setMaxAppAttempts(2);
+ }
+ }
+
private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
int maxMemory = response.getMaximumResourceCapability().getMemory();
int updatedMemory = capability.getMemory();
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
index 97d2a64..0e3382f 100644
--- a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
@@ -48,14 +48,12 @@ import java.util.List;
* </p>
*/
@SuppressWarnings("unused")
-public final class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
+public class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop23YarnAppClient.class);
- private final Configuration configuration;
public Hadoop23YarnAppClient(Configuration configuration) {
super(configuration);
- this.configuration = configuration;
}
/**
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
new file mode 100644
index 0000000..1c27518
--- /dev/null
+++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.twill.api.Configs;
+
+/**
+ * <p>
+ * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.6 and beyond.
+ *
+ * The {@link VersionDetectYarnAppClientFactory} class will decide to return instance of this class for
+ * Apache Hadoop 2.6 and beyond.
+ * </p>
+ */
+@SuppressWarnings("unused")
+public class Hadoop26YarnAppClient extends Hadoop23YarnAppClient {
+
+ public Hadoop26YarnAppClient(Configuration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+ super.configureAppSubmissionContext(context);
+ long interval = configuration.getLong(Configs.Keys.YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL, -1L);
+ if (interval > 0) {
+ context.setAttemptFailuresValidityInterval(interval);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 445656d..7706d52 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -165,9 +165,10 @@ public final class ApplicationMasterMain extends ServiceMain {
@Override
protected void startUp() throws Exception {
- ZKOperations.ignoreError(
- zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
- KeeperException.NodeExistsException.class, kafkaZKPath).get();
+ // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
+ // no left over content from previous AM attempt.
+ LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
+ ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get();
kafkaServer.startAndWait();
}
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 6fc31f5..8a80041 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -78,7 +78,9 @@ import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -318,8 +320,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
- // Creates ZK path for runnable
- zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
+ // Creates ZK path for runnable. It's ok if the path already exists.
+ // That's for the case when the AM get killed and restarted
+ ZKOperations.ignoreError(
+ zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, null)
+ .get();
runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
runnableContainerRequests = initContainerRequests();
}
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index c8e88c9..83de2a4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -39,9 +39,13 @@ public final class VersionDetectYarnAppClientFactory implements YarnAppClientFac
// 2.1 and 2.2 uses the same YarnAppClient
clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
break;
- default:
+ case HADOOP_23:
// 2.3 and above uses the 2.3 YarnAppClient to support RM HA
clzName = getClass().getPackage().getName() + ".Hadoop23YarnAppClient";
+ break;
+ default:
+ // Anything above 2.3 will be 2.6 and beyond
+ clzName = getClass().getPackage().getName() + ".Hadoop26YarnAppClient";
}
Class<YarnAppClient> clz = (Class<YarnAppClient>) Class.forName(clzName);
return clz.getConstructor(Configuration.class).newInstance(configuration);
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 335d7ec..8f844a2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -68,6 +69,7 @@ final class YarnTwillController extends AbstractTwillController implements Twill
private final TimeUnit startTimeoutUnit;
private volatile ApplicationMasterLiveNodeData amLiveNodeData;
private ProcessController<YarnApplicationReport> processController;
+ private ApplicationAttemptId currentAttemptId;
// Thread for polling yarn for application status if application got ZK session expire.
// Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback.
@@ -141,6 +143,8 @@ final class YarnTwillController extends AbstractTwillController implements Twill
LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId);
forceShutDown();
}
+
+ currentAttemptId = report.getCurrentApplicationAttemptId();
} catch (Exception e) {
throw Throwables.propagate(e);
}
@@ -273,6 +277,13 @@ final class YarnTwillController extends AbstractTwillController implements Twill
shutdown = true;
break;
}
+ ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId();
+ if (currentAttemptId.compareTo(attemptId) != 0) {
+ LOG.info("Application attempt ID change from {} to {}", currentAttemptId, attemptId);
+ currentAttemptId = attemptId;
+ resetLogHandler();
+ }
+
// Make a sync exists call to instance node and re-watch if the node exists
try {
// The timeout is arbitrary, as it's just for avoiding block forever
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
new file mode 100644
index 0000000..4f5adce
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.yarn;
+
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for application master resilience.
+ */
+public class AppRecoveryTestRun extends BaseYarnTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testAMRestart() throws Exception {
+ // Only run it with Hadoop-2.1 or above
+ Assume.assumeTrue(YarnUtils.getHadoopVersion().compareTo(YarnUtils.HadoopVersions.HADOOP_21) >= 0);
+ // Don't run this test in Mac, as there would be leftover java process (HADOOP-12317)
+ // The test can be force to run by turning on the "force-mac-tests" maven profile
+ // After the test finished, run the `jps` command and delete all `TwillLauncher` processes
+ Assume.assumeTrue(Boolean.parseBoolean(System.getProperty("force.mac.tests")) ||
+ !System.getProperty("os.name").toLowerCase().contains("mac"));
+
+ File watchFile = TEMP_FOLDER.newFile();
+ watchFile.delete();
+
+ // Start the testing app, and wait for 4 log lines that match the pattern emitted by the event handler (AM)
+ // and from the runnable
+ final Semaphore semaphore = new Semaphore(0);
+ TwillRunner runner = getTwillRunner();
+ TwillController controller = runner.prepare(new TestApp(new TestEventHandler(watchFile)))
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ // Use a log handler to match messages from AM and the Runnable to make sure the log collection get resumed
+ // correctly after the AM restarted
+ .addLogHandler(new LogHandler() {
+ @Override
+ public void onLog(LogEntry logEntry) {
+ String message = logEntry.getMessage();
+ if (message.equals("Container for " + TestRunnable.class.getSimpleName() + " launched")) {
+ semaphore.release();
+ } else if (message.equals("Running 0")) {
+ semaphore.release();
+ }
+ }
+ })
+ .start();
+
+ // Wait for the first attempt running
+ Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+ // Touch the watchFile so that the event handler will kill the AM
+ Files.touch(watchFile);
+ // Wait for the second attempt running
+ Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+
+ controller.terminate().get();
+ }
+
+ /**
+ * A {@link EventHandler} for killing the first attempt of the application.
+ */
+ public static final class TestEventHandler extends EventHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestEventHandler.class);
+
+ private File watchFile;
+
+ TestEventHandler(File watchFile) {
+ this.watchFile = watchFile;
+ }
+
+ @Override
+ public void containerLaunched(String runnableName, int instanceId, String containerId) {
+ LOG.info("Container for {} launched", runnableName);
+
+ if (containerId.contains("_01_")) {
+ final File watchFile = new File(context.getSpecification().getConfigs().get("watchFile"));
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ // Wait for the watch file to be available, then kill the process
+ while (!watchFile.exists()) {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ Runtime.getRuntime().halt(-1);
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
+ @Override
+ protected Map<String, String> getConfigs() {
+ return Collections.singletonMap("watchFile", watchFile.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Application for testing
+ */
+ public static final class TestApp implements TwillApplication {
+
+ private final EventHandler eventHandler;
+
+ public TestApp(EventHandler eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("TestApp")
+ .withRunnable()
+ .add(new TestRunnable()).noLocalFiles()
+ .anyOrder()
+ .withEventHandler(eventHandler).build();
+ }
+ }
+
+ /**
+ * Runnable for testing
+ */
+ public static final class TestRunnable extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRunnable.class);
+
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ @Override
+ public void run() {
+ long count = 0;
+ try {
+ while (!stopLatch.await(2, TimeUnit.SECONDS)) {
+ LOG.info("Running {}", count++);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopLatch.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
index 0e2239d..bce6391 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
@@ -25,16 +25,21 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
/**
* Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
@@ -282,6 +287,73 @@ public final class ZKOperations {
}
/**
+ * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen
+ * and the creation will be retried.
+ */
+ public static OperationFuture<String> createDeleteIfExists(final ZKClient zkClient, final String path,
+ @Nullable final byte[] data, final CreateMode createMode,
+ final boolean createParent, final ACL...acls) {
+ final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(path,
+ Threads.SAME_THREAD_EXECUTOR);
+ final List<ACL> createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+ createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback<String>() {
+
+ final FutureCallback<String> createCallback = this;
+
+ @Override
+ public void onSuccess(String result) {
+ // Create succeeded, just set the result to the resultFuture
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable createFailure) {
+ // If create failed not because of the NodeExistsException, just set the exception to the result future
+ if (!(createFailure instanceof KeeperException.NodeExistsException)) {
+ resultFuture.setException(createFailure);
+ return;
+ }
+
+ // Try to delete the path
+ LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path);
+ Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // If delete succeeded, perform the creation again.
+ createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // If deletion failed because of NoNodeException, fail the result operation future
+ if (!(t instanceof KeeperException.NoNodeException)) {
+ createFailure.addSuppressed(t);
+ resultFuture.setException(createFailure);
+ return;
+ }
+
+ // If can't delete because the node no longer exists, just go ahead and recreate the node
+ createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ });
+
+ return resultFuture;
+ }
+
+ /**
+ * Private helper method to create a ZK node based on the parameter. The result of the creation is always
+ * communicate via the provided {@link FutureCallback}.
+ */
+ private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent,
+ Iterable<ACL> acls, FutureCallback<String> callback) {
+ Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls),
+ callback, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ /**
* Watch for the given path until it exists.
* @param zkClient The {@link ZKClient} to use.
* @param path A ZooKeeper path to watch for existent.