You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/01/26 23:32:21 UTC
[3/5] incubator-twill git commit: (TWILL-80) Perform log force flush
correctly. Also there are refactoring around service starts/stops for
container (both AM and runnable) to have a cleaner view on the service
dependencies. The help making sure unregistr
(TWILL-80) Perform log force flush correctly. Also there are refactoring around
service starts/stops for container (both AM and runnable) to have a cleaner view
on the service dependencies. The help making sure unregistration to RM is done
last on AM, providing a more accurate view for RM.
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/5dfe40d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/5dfe40d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/5dfe40d3
Branch: refs/heads/feature/improve-log
Commit: 5dfe40d37dfbaedd13e9597ece54ecbe821d2c4e
Parents: 73ecf8e
Author: Terence Yim <ch...@apache.org>
Authored: Fri Dec 12 14:47:58 2014 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Dec 15 15:11:18 2014 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
twill-common/pom.xml | 1 -
.../apache/twill/common/CompositeService.java | 109 ++++++++++++++
.../apache/twill/filesystem/LocalLocation.java | 5 +
.../twill/common/CompositeServiceTest.java | 145 ++++++++++++++++++
.../twill/internal/AbstractTwillService.java | 25 +---
.../twill/internal/ApplicationBundler.java | 17 +--
.../twill/internal/TwillContainerLauncher.java | 2 +-
.../twill/internal/logging/KafkaAppender.java | 6 +-
.../apache/twill/filesystem/HDFSLocation.java | 5 +
.../org/apache/twill/internal/ServiceMain.java | 91 ++++++++++--
.../appmaster/ApplicationMasterMain.java | 146 +++++++++++++++++--
.../appmaster/ApplicationMasterService.java | 128 ++--------------
.../internal/appmaster/TrackerService.java | 37 +++--
.../internal/container/TwillContainerMain.java | 36 +++--
.../container/TwillContainerService.java | 13 +-
.../internal/yarn/AbstractYarnTwillService.java | 2 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 2 +-
.../twill/yarn/YarnTwillRunnerService.java | 2 +-
.../twill/yarn/InitializeFailTestRun.java | 33 ++++-
.../apache/twill/yarn/SessionExpireTestRun.java | 4 +-
.../zookeeper/DefaultZKClientService.java | 5 +-
22 files changed, 608 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index be184dd..3b8dfed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -339,7 +339,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.14.1</version>
<configuration>
- <argLine>-Xmx1024m -XX:MaxPermSize=256m -Djava.awt.headless=true</argLine>
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m -Djava.awt.headless=true</argLine>
<redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/pom.xml
----------------------------------------------------------------------
diff --git a/twill-common/pom.xml b/twill-common/pom.xml
index e57bad9..cf4c2c0 100644
--- a/twill-common/pom.xml
+++ b/twill-common/pom.xml
@@ -41,7 +41,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/src/main/java/org/apache/twill/common/CompositeService.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/common/CompositeService.java b/twill-common/src/main/java/org/apache/twill/common/CompositeService.java
new file mode 100644
index 0000000..2817988
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/common/CompositeService.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.common;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+
+/**
+ * A {@link Service} that starts/stops list of services in order.
+ */
+public final class CompositeService extends AbstractIdleService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class);
+ private final Deque<Service> services;
+
+ public CompositeService(Service...services) {
+ this(ImmutableList.copyOf(services));
+ }
+
+ public CompositeService(Iterable<? extends Service> services) {
+ this.services = new ArrayDeque<Service>();
+ Iterables.addAll(this.services, services);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Throwable failureCause = null;
+
+ for (Service service : services) {
+ try {
+ service.startAndWait();
+ } catch (UncheckedExecutionException e) {
+ failureCause = e.getCause();
+ break;
+ }
+ }
+
+ if (failureCause != null) {
+ // Stop all running services and then throw the failure exception
+ try {
+ stopAll();
+ } catch (Throwable t) {
+ // Ignore the stop error. Just log.
+ LOG.warn("Failed when stopping all services on start failure", t);
+ }
+
+ Throwables.propagateIfPossible(failureCause, Exception.class);
+ throw new RuntimeException(failureCause);
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ stopAll();
+ }
+
+ private void stopAll() throws Exception {
+ Throwable failureCause = null;
+
+ // Stop services in reverse order.
+ Iterator<Service> itor = services.descendingIterator();
+ while (itor.hasNext()) {
+ Service service = itor.next();
+ try {
+ if (service.isRunning() || service.state() == State.STARTING) {
+ service.stopAndWait();
+ }
+ } catch (UncheckedExecutionException e) {
+ // Just catch as we want all services stopped
+ if (failureCause == null) {
+ failureCause = e.getCause();
+ } else {
+ // Log for sub-sequence service shutdown error, as only the first failure cause will be thrown.
+ LOG.warn("Failed to stop service {}", service, e);
+ }
+ }
+ }
+
+ if (failureCause != null) {
+ Throwables.propagateIfPossible(failureCause, Exception.class);
+ throw new RuntimeException(failureCause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index 6dec09c..3b01e61 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -251,4 +251,9 @@ final class LocalLocation implements Location {
public int hashCode() {
return file.hashCode();
}
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java b/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java
new file mode 100644
index 0000000..1c45892
--- /dev/null
+++ b/twill-common/src/test/java/org/apache/twill/common/CompositeServiceTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests for {@link CompositeService}.
+ */
+public class CompositeServiceTest {
+
+ @Test
+ public void testOrder() throws InterruptedException, ExecutionException, TimeoutException {
+ List<Service> services = Lists.newArrayList();
+
+ // Start 10 services and check their start sequence is ordered.
+ Semaphore semaphore = new Semaphore(0);
+ for (int i = 0; i < 10; i++) {
+ services.add(new TestService(semaphore, i));
+ }
+
+ Service service = new CompositeService(services);
+ service.start().get(5, TimeUnit.SECONDS);
+
+ // There should be 10 permits after all 10 services started
+ Assert.assertTrue(semaphore.tryAcquire(10, 5, TimeUnit.SECONDS));
+
+ // Check all services are running
+ Assert.assertTrue(Iterables.all(services, serviceStatePredicate(Service.State.RUNNING)));
+
+ // Release 10 permits for the stop sequence to start
+ semaphore.release(10);
+ service.stop().get(5, TimeUnit.SECONDS);
+
+ // There should be no permit left after all 10 services stopped
+ Assert.assertFalse(semaphore.tryAcquire(10));
+
+ // Check all services are stopped
+ Assert.assertTrue(Iterables.all(services, serviceStatePredicate(Service.State.TERMINATED)));
+ }
+
+ @Test
+ public void testErrorStart() throws InterruptedException {
+ List<Service> services = Lists.newArrayList();
+
+ // Create 5 services. The forth one will got a start failure.
+ Semaphore semaphore = new Semaphore(0);
+ for (int i = 0; i < 5; i++) {
+ services.add(new TestService(semaphore, i, i == 3));
+ }
+
+ Service service = new CompositeService(services);
+ try {
+ service.start().get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ // Expected
+ }
+
+ // Verify all services are not in running state
+ Assert.assertTrue(Iterables.all(services, Predicates.not(serviceStatePredicate(Service.State.RUNNING))));
+
+ // There should be one service in error state
+ Assert.assertTrue(Iterables.removeIf(services, serviceStatePredicate(Service.State.FAILED)));
+
+ for (Service s : services) {
+ Assert.assertNotEquals(3, ((TestService) s).getOrder());
+ }
+ }
+
+ private Predicate<Service> serviceStatePredicate(final Service.State state) {
+ return new Predicate<Service>() {
+ @Override
+ public boolean apply(Service service) {
+ return service.state() == state;
+ }
+ };
+ }
+
+ private static final class TestService extends AbstractIdleService {
+
+ private final Semaphore semaphore;
+ private final int order;
+ private final boolean startFail;
+
+ private TestService(Semaphore semaphore, int order) {
+ this(semaphore, order, false);
+ }
+
+ private TestService(Semaphore semaphore, int order, boolean startFail) {
+ this.semaphore = semaphore;
+ this.order = order;
+ this.startFail = startFail;
+ }
+
+ public int getOrder() {
+ return order;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkState(!startFail, "Fail to start service of order %s", order);
+
+ // Acquire all permits emitted by previous service
+ semaphore.acquire(order);
+ // Emit enough permits for next service
+ semaphore.release(order + 1);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ semaphore.acquire(order + 1);
+ semaphore.release(order);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index a6939e2..2f95e0e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -18,7 +18,6 @@
package org.apache.twill.internal;
import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.FutureCallback;
@@ -145,13 +144,6 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
Threads.createDaemonThreadFactory("message-callback"),
new ThreadPoolExecutor.DiscardPolicy());
- // Create the live node, if succeeded, start the service, otherwise fail out.
- createLiveNode().get();
-
- // Create node for messaging
- ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
- KeeperException.NodeExistsException.class, null).get();
-
// Watch for session expiration, recreate the live node if reconnected after expiration.
zkClient.addConnectionWatcher(new Watcher() {
private boolean expired = false;
@@ -169,6 +161,13 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
}
});
+ // Create the live node, if succeeded, start the service, otherwise fail out.
+ createLiveNode().get();
+
+ // Create node for messaging
+ ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, null).get();
+
doStart();
// Starts watching for messages
@@ -186,10 +185,8 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
try {
doStop();
} finally {
- ListenableFuture<List<String>> removeCompletion = Futures.successfulAsList(ImmutableList.of(removeServiceNode(),
- removeLiveNode()));
// Given at most 5 seconds to cleanup ZK nodes
- removeCompletion.get(5, TimeUnit.SECONDS);
+ removeLiveNode().get(5, TimeUnit.SECONDS);
LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
}
}
@@ -213,12 +210,6 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
}
- private OperationFuture<String> removeServiceNode() {
- String serviceNode = String.format("/%s", runId.getId());
- LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
- return ZKOperations.recursiveDelete(zkClient, serviceNode);
- }
-
/**
* Watches for messages that are sent through ZK messages node.
*/
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
index 3f2d073..43268b7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
@@ -142,20 +142,19 @@ public final class ApplicationBundler {
} finally {
jarOut.close();
}
- LOG.debug("copying temporary bundle to destination {} ({} bytes)", target.toURI(), tmpJar.length());
+ LOG.debug("copying temporary bundle to destination {} ({} bytes)", target, tmpJar.length());
// Copy the tmp jar into destination.
- OutputStream os = null;
try {
- os = new BufferedOutputStream(target.getOutputStream());
- Files.copy(tmpJar, os);
- } catch (IOException e) {
- throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target.toURI(), e);
- } finally {
- if (os != null) {
+ OutputStream os = new BufferedOutputStream(target.getOutputStream());
+ try {
+ Files.copy(tmpJar, os);
+ } finally {
os.close();
}
+ } catch (IOException e) {
+ throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target, e);
}
- LOG.debug("finished creating bundle at {}", target.toURI());
+ LOG.debug("finished creating bundle at {}", target);
} finally {
tmpJar.delete();
LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index a891de6..1688073 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -98,7 +98,7 @@ public final class TwillContainerLauncher {
secureStoreLocation.length(), false, null));
}
} catch (IOException e) {
- LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation.toURI());
+ LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
}
if (afterResources == null) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 174c25d..c8becd3 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -185,9 +185,9 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
public void forceFlush() {
try {
- publishLogs(2, TimeUnit.SECONDS);
+ scheduler.submit(flushTask).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
- LOG.error("Failed to publish last batch of log.", e);
+ LOG.error("Failed to force log flush in 2 seconds.", e);
}
}
@@ -279,7 +279,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
try {
int published = publishLogs(2L, TimeUnit.SECONDS);
if (LOG.isDebugEnabled()) {
- LOG.info("Published {} log messages to Kafka.", published);
+ LOG.debug("Published {} log messages to Kafka.", published);
}
} catch (Exception e) {
LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index ed86cb8..2ab97db 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -238,4 +238,9 @@ final class HDFSLocation implements Location {
public int hashCode() {
return path.hashCode();
}
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index ae86f42..fc6cb3c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -22,18 +22,30 @@ import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.classic.util.ContextInitializer;
import ch.qos.logback.core.joran.spi.JoranException;
import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.twill.api.RunId;
+import org.apache.twill.common.CompositeService;
import org.apache.twill.common.Services;
import org.apache.twill.filesystem.HDFSLocationFactory;
import org.apache.twill.filesystem.LocalLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.logging.KafkaAppender;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +55,7 @@ import java.io.File;
import java.io.StringReader;
import java.net.URI;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* Class for main method that starts a service.
@@ -58,45 +71,54 @@ public abstract class ServiceMain {
}
}
- protected final void doMain(final ZKClientService zkClientService,
- final Service service) throws ExecutionException, InterruptedException {
+ protected final void doMain(final Service mainService,
+ Service...prerequisites) throws ExecutionException, InterruptedException {
configureLogger();
- final String serviceName = service.toString();
+ Service requiredServices = new CompositeService(prerequisites);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- Services.chainStop(service, zkClientService);
+ mainService.stopAndWait();
}
});
// Listener for state changes of the service
- ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
+ ListenableFuture<Service.State> completion = Services.getCompletionFuture(mainService);
+ Throwable initFailure = null;
try {
try {
// Starts the service
- LOG.info("Starting service {}.", serviceName);
- Futures.allAsList(Services.chainStart(zkClientService, service).get()).get();
- LOG.info("Service {} started.", serviceName);
+ LOG.info("Starting service {}.", mainService);
+ Futures.allAsList(Services.chainStart(requiredServices, mainService).get()).get();
+ LOG.info("Service {} started.", mainService);
} catch (Throwable t) {
- LOG.error("Exception when starting service {}.", serviceName, t);
- // Exit with the init fail exit code.
- System.exit(ContainerExitCodes.INIT_FAILED);
+ LOG.error("Exception when starting service {}.", mainService, t);
+ initFailure = t;
}
try {
- completion.get();
- LOG.info("Service {} completed.", serviceName);
+ if (initFailure == null) {
+ completion.get();
+ LOG.info("Service {} completed.", mainService);
+ }
} catch (Throwable t) {
- LOG.error("Exception thrown from service {}.", serviceName, t);
+ LOG.error("Exception thrown from service {}.", mainService, t);
throw Throwables.propagate(t);
}
} finally {
+ requiredServices.stopAndWait();
+
ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
if (loggerFactory instanceof LoggerContext) {
((LoggerContext) loggerFactory).stop();
}
+
+ if (initFailure != null) {
+ // Exit with the init fail exit code.
+ System.exit(ContainerExitCodes.INIT_FAILED);
+ }
}
}
@@ -139,6 +161,17 @@ public abstract class ServiceMain {
}
}
+ /**
+ * Creates a {@link ZKClientService}.
+ */
+ protected static ZKClientService createZKClient(String zkConnectStr) {
+ return ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+ }
+
private void configureLogger() {
// Check if SLF4J is bound to logback in the current environment
ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
@@ -220,4 +253,34 @@ public abstract class ServiceMain {
}
return "OFF";
}
+
+ /**
+ * A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
+ */
+ protected static final class TwillZKPathService extends AbstractIdleService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
+ private static final long TIMEOUT_SECONDS = 5L;
+
+ private final ZKClient zkClient;
+ private final String path;
+
+ public TwillZKPathService(ZKClient zkClient, RunId runId) {
+ this.zkClient = zkClient;
+ this.path = "/" + runId.getId();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("Creating container ZK path: {}{}", zkClient.getConnectString(), path);
+ ZKOperations.ignoreError(zkClient.create(path, null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, null).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOG.info("Removing container ZK path: {}{}", zkClient.getConnectString(), path);
+ ZKOperations.recursiveDelete(zkClient, path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 914f13f..561a68b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,7 +17,8 @@
*/
package org.apache.twill.internal.appmaster;
-import com.google.common.util.concurrent.Service;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -26,15 +27,23 @@ import org.apache.twill.internal.Constants;
import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
-import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
@@ -56,18 +65,23 @@ public final class ApplicationMasterMain extends ServiceMain {
File twillSpec = new File(Constants.Files.TWILL_SPEC);
RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
- ZKClientService zkClientService =
- ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkConnect).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
+ ZKClientService zkClientService = createZKClient(zkConnect);
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
setRMSchedulerAddress(conf);
- Service service = new ApplicationMasterService(runId, zkClientService, twillSpec,
- new VersionDetectYarnAMClientFactory(conf), createAppLocation(conf));
- new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())).doMain(zkClientService, service);
+
+ final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
+ ApplicationMasterService service = new ApplicationMasterService(runId, zkClientService,
+ twillSpec, amClient, createAppLocation(conf));
+ TrackerService trackerService = new TrackerService(service);
+
+ new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+ .doMain(
+ service,
+ new YarnAMClientService(amClient, trackerService),
+ zkClientService,
+ new TwillZKPathService(zkClientService, runId),
+ new ApplicationKafkaService(zkClientService, runId)
+ );
}
/**
@@ -105,4 +119,108 @@ public final class ApplicationMasterMain extends ServiceMain {
protected String getRunnableName() {
return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
}
+
+
+ /**
+ * A service wrapper for starting/stopping {@link EmbeddedKafkaServer} and make sure the ZK path for
+ * Kafka exists before starting the Kafka server.
+ */
+ private static final class ApplicationKafkaService extends AbstractIdleService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationKafkaService.class);
+
+ private final ZKClient zkClient;
+ private final String kafkaZKPath;
+ private final EmbeddedKafkaServer kafkaServer;
+
+ private ApplicationKafkaService(ZKClient zkClient, RunId runId) {
+ this.zkClient = zkClient;
+ this.kafkaZKPath = "/" + runId.getId() + "/kafka";
+ this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkClient.getConnectString() + kafkaZKPath));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ ZKOperations.ignoreError(
+ zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, kafkaZKPath).get();
+ kafkaServer.startAndWait();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // Flush all logs before shutting down Kafka server
+ Loggings.forceFlush();
+ // Delay for 2 seconds to give clients chance to poll the last batch of log messages.
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ } catch (InterruptedException e) {
+ // Ignore
+ LOG.info("Kafka shutdown delay interrupted", e);
+ } finally {
+ kafkaServer.stopAndWait();
+ }
+ }
+
+ private Properties generateKafkaConfig(String kafkaZKConnect) {
+ int port = Networks.getRandomPort();
+ Preconditions.checkState(port > 0, "Failed to get random port.");
+
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
+ prop.setProperty("port", Integer.toString(port));
+ prop.setProperty("broker.id", "1");
+ prop.setProperty("socket.send.buffer.bytes", "1048576");
+ prop.setProperty("socket.receive.buffer.bytes", "1048576");
+ prop.setProperty("socket.request.max.bytes", "104857600");
+ prop.setProperty("num.partitions", "1");
+ prop.setProperty("log.retention.hours", "24");
+ prop.setProperty("log.flush.interval.messages", "10000");
+ prop.setProperty("log.flush.interval.ms", "1000");
+ prop.setProperty("log.segment.bytes", "536870912");
+ prop.setProperty("zookeeper.connect", kafkaZKConnect);
+ prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ prop.setProperty("default.replication.factor", "1");
+ return prop;
+ }
+ }
+
+
+ /**
+ * A Service wrapper that starts {@link TrackerService} and {@link YarnAMClient}. It is needed because
+ * the tracker host and url needs to be provided to {@link YarnAMClient} before it starts {@link YarnAMClient}.
+ */
+ private static final class YarnAMClientService extends AbstractIdleService {
+
+ private final YarnAMClient yarnAMClient;
+ private final TrackerService trackerService;
+
+ private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerService) {
+ this.yarnAMClient = yarnAMClient;
+ this.trackerService = trackerService;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ trackerService.setHost(yarnAMClient.getHost());
+ trackerService.startAndWait();
+
+ yarnAMClient.setTracker(trackerService.getBindAddress(), trackerService.getUrl());
+ try {
+ yarnAMClient.startAndWait();
+ } catch (Exception e) {
+ trackerService.stopAndWait();
+ throw e;
+ }
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ try {
+ yarnAMClient.stopAndWait();
+ } finally {
+ trackerService.stopAndWait();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index d8659dd..8f8952b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -23,7 +23,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -67,14 +66,10 @@ import org.apache.twill.internal.TwillContainerLauncher;
import org.apache.twill.internal.json.JvmOptionsCodec;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.utils.Instances;
-import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
import org.apache.twill.internal.yarn.YarnAMClient;
-import org.apache.twill.internal.yarn.YarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnContainerInfo;
import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.internal.yarn.YarnUtils;
@@ -88,12 +83,10 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
-import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -103,7 +96,7 @@ import java.util.concurrent.TimeUnit;
/**
*
*/
-public final class ApplicationMasterService extends AbstractYarnTwillService {
+public final class ApplicationMasterService extends AbstractYarnTwillService implements Supplier<ResourceReport> {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
@@ -116,7 +109,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
private final ApplicationMasterLiveNodeData amLiveNode;
private final RunningContainers runningContainers;
private final ExpectedContainers expectedContainers;
- private final TrackerService trackerService;
private final YarnAMClient amClient;
private final JvmOptions jvmOpts;
private final int reservedMemory;
@@ -125,19 +117,18 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
private final PlacementPolicyManager placementPolicyManager;
private volatile boolean stopped;
- private EmbeddedKafkaServer kafkaServer;
private Queue<RunnableContainerRequest> runnableContainerRequests;
private ExecutorService instanceChangeExecutor;
public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile,
- YarnAMClientFactory amClientFactory, Location applicationLocation) throws Exception {
+ YarnAMClient amClient, Location applicationLocation) throws Exception {
super(zkClient, runId, applicationLocation);
this.runId = runId;
this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile);
this.zkClient = zkClient;
this.applicationLocation = applicationLocation;
- this.amClient = amClientFactory.create();
+ this.amClient = amClient;
this.credentials = createCredentials();
this.jvmOpts = loadJvmOptions();
this.reservedMemory = getReservedMemory();
@@ -149,12 +140,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
expectedContainers = initExpectedContainers(twillSpec);
runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
- trackerService = new TrackerService(new Supplier<ResourceReport>() {
- @Override
- public ResourceReport get() {
- return runningContainers.getResourceReport();
- }
- }, amClient.getHost());
eventHandler = createEventHandler(twillSpec);
}
@@ -183,6 +168,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
}
}
+ @SuppressWarnings("unchecked")
private EventHandler createEventHandler(TwillSpecification twillSpec) {
try {
// Should be able to load by this class ClassLoader, as they packaged in the same jar.
@@ -219,6 +205,11 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
}
@Override
+ public ResourceReport get() {
+ return runningContainers.getResourceReport();
+ }
+
+ @Override
protected void doStart() throws Exception {
LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
@@ -227,31 +218,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
- kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig());
-
- // Must start tracker before start AMClient
- LOG.info("Starting application master tracker server");
- trackerService.startAndWait();
- URL trackerUrl = trackerService.getUrl();
- LOG.info("Started application master tracker server on " + trackerUrl);
-
- amClient.setTracker(trackerService.getBindAddress(), trackerUrl);
- amClient.startAndWait();
-
- // Creates ZK path for runnable and kafka logging service
- Futures.allAsList(ImmutableList.of(
- zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
- zkClient.create("/" + runId.getId() + "/kafka", null, CreateMode.PERSISTENT))
- ).get();
-
+ // Creates ZK path for runnable
+ zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
runningContainers.addWatcher("/discoverable");
-
- // Starts kafka server
- LOG.info("Starting kafka server");
-
- kafkaServer.startAndWait();
- LOG.info("Kafka server started");
-
runnableContainerRequests = initContainerRequests();
}
@@ -295,29 +264,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
TimeUnit.SECONDS.sleep(1);
}
- LOG.info("Stopping application master tracker server");
- try {
- trackerService.stopAndWait();
- LOG.info("Stopped application master tracker server");
- } catch (Exception e) {
- LOG.error("Failed to stop tracker service.", e);
- } finally {
- try {
- try {
- // App location cleanup
- cleanupDir();
- Loggings.forceFlush();
- // Sleep a short while to let kafka clients to have chance to fetch the log
- TimeUnit.SECONDS.sleep(1);
- } finally {
- kafkaServer.stopAndWait();
- LOG.info("Kafka server stopped");
- }
- } finally {
- // Stops the AMClient
- amClient.stopAndWait();
- }
- }
+ cleanupDir();
}
@Override
@@ -431,7 +378,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
(System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
// Clear the blacklist for the pending provision request(s).
- clearBlacklist();
+ amClient.clearBlacklist();
isRequestRelaxed = true;
}
@@ -447,7 +394,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
* Manage Blacklist for a given request.
*/
private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
- clearBlacklist();
+ amClient.clearBlacklist();
//Check the allocation strategy
AllocationSpecification currentAllocationSpecification = request.getKey();
@@ -466,8 +413,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
// Yarn Resource Manager may include port in the node name depending on the setting
// YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is safe to add both
// the names (with and without port) to the blacklist.
- addToBlacklist(containerInfo.getHost().getHostName());
- addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
+ amClient.addToBlacklist(containerInfo.getHost().getHostName());
+ amClient.addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
}
}
}
@@ -642,27 +589,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
}
/**
- * Add a resource to the blacklist.
- */
- private void addToBlacklist(String resource) {
- amClient.addToBlacklist(resource);
- }
-
- /**
- * Remove a resource from the blacklist.
- */
- private void removeFromBlacklist(String resource) {
- amClient.removeFromBlacklist(resource);
- }
-
- /**
- * Clears the resource blacklist.
- */
- private void clearBlacklist() {
- amClient.clearBlacklist();
- }
-
- /**
* Launches runnables in the provisioned containers.
*/
private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
@@ -735,28 +661,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService {
return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
}
- private Properties generateKafkaConfig() {
- int port = Networks.getRandomPort();
- Preconditions.checkState(port > 0, "Failed to get random port.");
-
- Properties prop = new Properties();
- prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
- prop.setProperty("port", Integer.toString(port));
- prop.setProperty("broker.id", "1");
- prop.setProperty("socket.send.buffer.bytes", "1048576");
- prop.setProperty("socket.receive.buffer.bytes", "1048576");
- prop.setProperty("socket.request.max.bytes", "104857600");
- prop.setProperty("num.partitions", "1");
- prop.setProperty("log.retention.hours", "24");
- prop.setProperty("log.flush.interval.messages", "10000");
- prop.setProperty("log.flush.interval.ms", "1000");
- prop.setProperty("log.segment.bytes", "536870912");
- prop.setProperty("zookeeper.connect", getKafkaZKConnect());
- prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
- prop.setProperty("default.replication.factor", "1");
- return prop;
- }
-
/**
* Attempts to change the number of running instances.
* @return {@code true} if the message does requests for changes in number of running instances of a runnable,
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
index 6165c6f..7c09c58 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -64,6 +64,7 @@ import java.net.URL;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.GET;
/**
* Webservice that the Application Master will register back to the resource manager
@@ -80,36 +81,43 @@ public final class TrackerService extends AbstractIdleService {
private static final int CLOSE_CHANNEL_TIMEOUT = 5;
private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
- private final String host;
+ private final Supplier<ResourceReport> resourceReport;
+ private final ChannelGroup channelGroup;
+
+ private String host;
private ServerBootstrap bootstrap;
private InetSocketAddress bindAddress;
private URL url;
- private final ChannelGroup channelGroup;
- private final Supplier<ResourceReport> resourceReport;
/**
* Initialize the service.
*
* @param resourceReport live report that the service will return to clients.
- * @param appMasterHost the application master host.
*/
- public TrackerService(Supplier<ResourceReport> resourceReport, String appMasterHost) {
+ TrackerService(Supplier<ResourceReport> resourceReport) {
this.channelGroup = new DefaultChannelGroup("appMasterTracker");
this.resourceReport = resourceReport;
- this.host = appMasterHost;
+ }
+
+ /**
+ * Sets the hostname which the tracker service will bind to. This method must be called before starting this
+ * tracker service.
+ */
+ void setHost(String host) {
+ this.host = host;
}
/**
* Returns the address this tracker service is bounded to.
*/
- public InetSocketAddress getBindAddress() {
+ InetSocketAddress getBindAddress() {
return bindAddress;
}
/**
* @return tracker url.
*/
- public URL getUrl() {
+ URL getUrl() {
return url;
}
@@ -138,7 +146,7 @@ public final class TrackerService extends AbstractIdleService {
pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("compressor", new HttpContentCompressor());
- pipeline.addLast("handler", new ReportHandler(resourceReport));
+ pipeline.addLast("handler", new ReportHandler());
return pipeline;
}
@@ -148,6 +156,8 @@ public final class TrackerService extends AbstractIdleService {
bindAddress = (InetSocketAddress) channel.getLocalAddress();
url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort())).toURL();
channelGroup.add(channel);
+
+ LOG.info("Tracker service started at {}", url);
}
@Override
@@ -159,18 +169,17 @@ public final class TrackerService extends AbstractIdleService {
} finally {
bootstrap.releaseExternalResources();
}
+ LOG.info("Tracker service stopped at {}", url);
}
/**
* Handler to return resources used by this application master, which will be available through
* the host and port set when this application master registered itself to the resource manager.
*/
- public class ReportHandler extends SimpleChannelUpstreamHandler {
- private final Supplier<ResourceReport> report;
+ final class ReportHandler extends SimpleChannelUpstreamHandler {
private final ResourceReportAdapter reportAdapter;
- public ReportHandler(Supplier<ResourceReport> report) {
- this.report = report;
+ public ReportHandler() {
this.reportAdapter = ResourceReportAdapter.create();
}
@@ -202,7 +211,7 @@ public final class TrackerService extends AbstractIdleService {
ChannelBuffer content = ChannelBuffers.dynamicBuffer();
Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
- reportAdapter.toJson(report.get(), writer);
+ reportAdapter.toJson(resourceReport.get(), writer);
try {
writer.close();
} catch (IOException e1) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 5c9aa45..bb3d200 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.container;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -43,10 +44,9 @@ import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.ServiceMain;
import org.apache.twill.internal.json.ArgumentsCodec;
import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +56,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Reader;
-import java.util.concurrent.TimeUnit;
/**
*
@@ -81,11 +80,7 @@ public final class TwillContainerMain extends ServiceMain {
int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
- ZKClientService zkClientService = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
+ ZKClientService zkClientService = createZKClient(zkConnectStr);
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
ZKClient electionZKClient = getAppRunZKClient(zkClientService, appRunId);
@@ -111,7 +106,11 @@ public final class TwillContainerMain extends ServiceMain {
Service service = new TwillContainerService(context, containerInfo, containerZKClient,
runId, runnableSpec, getClassLoader(),
createAppLocation(conf));
- new TwillContainerMain().doMain(zkClientService, service);
+ new TwillContainerMain().doMain(
+ service,
+ new LogFlushService(),
+ zkClientService,
+ new TwillZKPathService(containerZKClient, runId));
}
private static void loadSecureStore() throws IOException {
@@ -192,4 +191,23 @@ public final class TwillContainerMain extends ServiceMain {
protected String getRunnableName() {
return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
}
+
+
+ /**
+ * Simple service that force flushing logs on stop.
+ */
+ private static final class LogFlushService extends AbstractService {
+
+ @Override
+ protected void doStart() {
+ // No-op
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ Loggings.forceFlush();
+ notifyStopped();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index dc3761f..96031ce 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -30,7 +30,6 @@ import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.BasicTwillContext;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.ContainerLiveNodeData;
-import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.utils.Instances;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
@@ -134,9 +133,15 @@ public final class TwillContainerService extends AbstractYarnTwillService {
@Override
protected void doStop() throws Exception {
commandExecutor.shutdownNow();
- runnable.destroy();
- context.stop();
- Loggings.forceFlush();
+ try {
+ runnable.destroy();
+ } catch (Throwable t) {
+ // Just catch the exception, not propagate it since it's already in shutdown sequence and
+ // we want all twill services properly shutdown.
+ LOG.warn("Exception when calling runnable.destroy.", t);
+ } finally {
+ context.stop();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
index 4704345..961bc30 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnTwillService.java
@@ -93,7 +93,7 @@ public abstract class AbstractYarnTwillService extends AbstractTwillService {
UserGroupInformation.getCurrentUser().addCredentials(credentials);
this.credentials = credentials;
- LOG.info("Secure store updated from {}.", location.toURI());
+ LOG.info("Secure store updated from {}.", location);
} catch (Throwable t) {
LOG.error("Failed to update secure store.", t);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 0e0fc75..e78889a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -329,7 +329,7 @@ final class YarnTwillPreparer implements TwillPreparer {
List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, locationFactory, credentials);
for (Token<?> token : tokens) {
- LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation().toURI(), token);
+ LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation(), token);
}
} catch (IOException e) {
LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 6a33131..b2981e4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -601,7 +601,7 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
// Rename the tmp file into the credentials location
tmpLocation.renameTo(credentialsLocation);
- LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
+ LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation);
}
private static LocationFactory createDefaultLocationFactory(Configuration configuration) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
index 39813cc..01b599b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
@@ -21,11 +21,16 @@ import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Services;
+import org.junit.Assert;
import org.junit.Test;
import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,11 +43,33 @@ public class InitializeFailTestRun extends BaseYarnTest {
@Test
public void testInitFail() throws InterruptedException, ExecutionException, TimeoutException {
TwillRunner runner = YarnTestUtils.getTwillRunner();
- TwillController controller = runner.prepare(new InitFailRunnable())
- .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
- .start();
+ final CountDownLatch logLatch = new CountDownLatch(1);
+
+ // Verify that it receives the exception log entry that thrown when runnable initialize
+ LogHandler logVerifyHandler = new LogHandler() {
+ @Override
+ public void onLog(LogEntry logEntry) {
+ LogThrowable logThrowable = logEntry.getThrowable();
+ if (logThrowable != null) {
+ while (logThrowable.getCause() != null) {
+ logThrowable = logThrowable.getCause();
+ }
+ if (IllegalStateException.class.getName().equals(logThrowable.getClassName())
+ && logThrowable.getMessage().contains("Fail to init")) {
+ logLatch.countDown();
+ }
+ }
+ }
+ };
+
+ TwillController controller = runner
+ .prepare(new InitFailRunnable())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
+ .addLogHandler(logVerifyHandler)
+ .start();
Services.getCompletionFuture(controller).get(2, TimeUnit.MINUTES);
+ Assert.assertTrue(logLatch.await(10, TimeUnit.SECONDS));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
index f378606..6e2046c 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
@@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
@@ -94,7 +95,7 @@ public class SessionExpireTestRun extends BaseYarnTest {
QueryExp query = Query.isInstanceOf(new StringValueExp(ConnectionMXBean.class.getName()));
Stopwatch stopwatch = new Stopwatch();
-
+ stopwatch.start();
do {
// Find the AM session and expire it
Set<ObjectName> connectionBeans = mbeanServer.queryNames(ObjectName.WILDCARD, query);
@@ -111,6 +112,7 @@ public class SessionExpireTestRun extends BaseYarnTest {
}
}
}
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
return false;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/5dfe40d3/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 73ca274..0dde3be 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -424,18 +424,19 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
public void process(WatchedEvent event) {
try {
if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
- LOG.debug("Connected to ZooKeeper: " + zkStr);
+ LOG.debug("Connected to ZooKeeper: {}", zkStr);
notifyStarted();
return;
}
if (event.getState() == Event.KeeperState.Expired) {
- LOG.info("ZooKeeper session expired: " + zkStr);
+ LOG.info("ZooKeeper session expired: {}", zkStr);
// When connection expired, simply reconnect again
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
+ LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
zooKeeper.set(createZooKeeper());
} catch (IOException e) {
zooKeeper.set(null);