You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:02 UTC
[24/43] tez git commit: TEZ-2090. Add tests for jobs running in
external services. (sseth)
TEZ-2090. Add tests for jobs running in external services. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aadd0492
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aadd0492
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aadd0492
Branch: refs/heads/TEZ-2003
Commit: aadd04927b0cc646db2579f17f903d8e24916bdc
Parents: 7b71d3b
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 13 17:24:05 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:01 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
pom.xml | 6 +
.../apache/tez/dag/api/TezConfiguration.java | 2 +
.../apache/tez/dag/api/TaskCommunicator.java | 1 +
.../tez/dag/api/TaskCommunicatorContext.java | 3 +
.../tez/dag/app/TezTaskCommunicatorImpl.java | 42 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 2 +-
tez-ext-service-tests/pom.xml | 161 ++++
.../tez/dag/app/TezTestServiceCommunicator.java | 152 ++++
.../TezTestServiceContainerLauncher.java | 144 ++++
.../TezTestServiceNoOpContainerLauncher.java | 66 ++
.../rm/TezTestServiceTaskSchedulerService.java | 347 ++++++++
.../TezTestServiceTaskCommunicatorImpl.java | 182 ++++
.../org/apache/tez/service/ContainerRunner.java | 27 +
.../tez/service/MiniTezTestServiceCluster.java | 163 ++++
.../service/TezTestServiceConfConstants.java | 41 +
.../TezTestServiceProtocolBlockingPB.java | 22 +
.../tez/service/impl/ContainerRunnerImpl.java | 512 +++++++++++
.../apache/tez/service/impl/TezTestService.java | 126 +++
.../impl/TezTestServiceProtocolClientImpl.java | 82 ++
.../impl/TezTestServiceProtocolServerImpl.java | 133 +++
.../tez/shufflehandler/FadvisedChunkedFile.java | 78 ++
.../tez/shufflehandler/FadvisedFileRegion.java | 160 ++++
.../apache/tez/shufflehandler/IndexCache.java | 199 +++++
.../tez/shufflehandler/ShuffleHandler.java | 840 +++++++++++++++++++
.../tez/tests/TestExternalTezServices.java | 183 ++++
.../org/apache/tez/util/ProtoConverters.java | 172 ++++
.../src/test/proto/TezDaemonProtocol.proto | 84 ++
.../src/test/resources/log4j.properties | 19 +
.../org/apache/tez/runtime/task/TezChild.java | 2 +-
.../apache/tez/runtime/task/TezTaskRunner.java | 2 +-
31 files changed, 3943 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d7e4be5..975ce65 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -1,5 +1,6 @@
ALL CHANGES:
TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
TEZ-2006. Task communication plane needs to be pluggable.
+ TEZ-2090. Add tests for jobs running in external services.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ce4fa13..ca9db11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,11 @@
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-ext-service-tests</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>${pig.version}</version>
@@ -638,6 +643,7 @@
<module>tez-ui</module>
<module>tez-plugins</module>
<module>tez-tools</module>
+ <module>tez-ext-service-tests</module>
<module>tez-dist</module>
<module>docs</module>
</modules>
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 01e724e..1cd478e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1172,6 +1172,8 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
@ConfigurationScope(Scope.VERTEX)
public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class";
// TODO only validate property here, value can also be validated if necessary
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 97f9c16..c9f85e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.api;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 9b2d889..41675fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -44,5 +44,8 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 Move to vertex, taskIndex, version
void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
+ // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure.
+ // This will have to take into consideration the TA_FAILED event
+
// TODO Eventually Add methods to report availability stats to the scheduler.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 5652937..258c927 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -74,16 +74,22 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
new ConcurrentHashMap<TaskAttempt, ContainerId>();
private final TezTaskUmbilicalProtocol taskUmbilical;
+ private final String tokenIdentifier;
+ private final Token<JobTokenIdentifier> sessionToken;
private InetSocketAddress address;
private Server server;
- private static final class ContainerInfo {
+ public static final class ContainerInfo {
- ContainerInfo(ContainerId containerId) {
+ ContainerInfo(ContainerId containerId, String host, int port) {
this.containerId = containerId;
+ this.host = host;
+ this.port = port;
}
- ContainerId containerId;
+ final ContainerId containerId;
+ public final String host;
+ public final int port;
TezHeartbeatResponse lastResponse = null;
TaskSpec taskSpec = null;
long lastRequestId = 0;
@@ -110,6 +116,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
super(TezTaskCommunicatorImpl.class.getName());
this.taskCommunicatorContext = taskCommunicatorContext;
this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
+ this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+ this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
}
@@ -130,9 +138,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
- Token<JobTokenIdentifier> sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
- jobTokenSecretManager.addTokenForJob(
- taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(), sessionToken);
+ jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
server = new RPC.Builder(conf)
.setProtocol(TezTaskUmbilicalProtocol.class)
@@ -182,7 +188,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
@Override
public void registerRunningContainer(ContainerId containerId, String host, int port) {
- ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId));
+ ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port));
if (oldInfo != null) {
throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
}
@@ -230,9 +236,9 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
". Already registered to containerId: " + oldId);
}
}
-
}
+
@Override
public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
@@ -258,6 +264,18 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
return address;
}
+ protected String getTokenIdentifier() {
+ return tokenIdentifier;
+ }
+
+ protected Token<JobTokenIdentifier> getSessionToken() {
+ return sessionToken;
+ }
+
+ protected TaskCommunicatorContext getTaskCommunicatorContext() {
+ return taskCommunicatorContext;
+ }
+
public TezTaskUmbilicalProtocol getUmbilical() {
return this.taskUmbilical;
}
@@ -471,4 +489,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}';
}
}
+
+ protected ContainerInfo getContainerInfo(ContainerId containerId) {
+ return registeredContainers.get(containerId);
+ }
+
+ protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) {
+ return attemptToContainerMap.get(new TaskAttempt(taskAttemptId));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 62f82db..8c3ed87 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -350,7 +350,7 @@ public class TaskSchedulerEventHandler extends AbstractService
try {
Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
.getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
- Integer.class, String.class, Configuration.class);
+ int.class, String.class, Configuration.class);
ctor.setAccessible(true);
TaskSchedulerService taskSchedulerService =
ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
new file mode 100644
index 0000000..37f68b1
--- /dev/null
+++ b/tez-ext-service-tests/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>tez</artifactId>
+ <groupId>org.apache.tez</groupId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <!-- TODO TEZ-2003 Merge this into the tez-tests module -->
+ <artifactId>tez-ext-service-tests</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ </dependency>
+ <dependency>
+ <!-- Required for the ShuffleHandler -->
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <!--
+ Include all files in src/main/resources. By default, do not apply property
+ substitution (filtering=false), but do apply property substitution to
+ version-info.properties (filtering=true). This will substitute the
+ version information correctly, but prevent Maven from altering other files.
+ -->
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <excludes>
+ <exclude>tez-api-version-info.properties</exclude>
+ </excludes>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>tez-api-version-info.properties</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/test/proto</param>
+ <param>${basedir}/../tez-api/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/test/proto</directory>
+ <includes>
+ <include>TezDaemonProtocol.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-test-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
new file mode 100644
index 0000000..ac50878
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+
+public class TezTestServiceCommunicator extends AbstractService {
+
+ private final ConcurrentMap<String, TezTestServiceProtocolBlockingPB> hostProxies;
+ private final ListeningExecutorService executor;
+
+ // TODO Convert this into a singleton
+ public TezTestServiceCommunicator(int numThreads) {
+ super(TezTestServiceCommunicator.class.getSimpleName());
+ ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setNameFormat("TezTestServiceCommunicator #%2d").build());
+ this.hostProxies = new ConcurrentHashMap<String, TezTestServiceProtocolBlockingPB>();
+ executor = MoreExecutors.listeningDecorator(localExecutor);
+ }
+
+ @Override
+ public void serviceStop() {
+ executor.shutdownNow();
+ }
+
+
+ public void runContainer(RunContainerRequestProto request, String host, int port,
+ final ExecuteRequestCallback<RunContainerResponseProto> callback) {
+ ListenableFuture<RunContainerResponseProto> future = executor.submit(new RunContainerCallable(request, host, port));
+ Futures.addCallback(future, new FutureCallback<RunContainerResponseProto>() {
+ @Override
+ public void onSuccess(RunContainerResponseProto result) {
+ callback.setResponse(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.indicateError(t);
+ }
+ });
+
+ }
+
+ public void submitWork(SubmitWorkRequestProto request, String host, int port,
+ final ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
+ ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request, host, port));
+ Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
+ @Override
+ public void onSuccess(SubmitWorkResponseProto result) {
+ callback.setResponse(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.indicateError(t);
+ }
+ });
+
+ }
+
+
+ private class RunContainerCallable implements Callable<RunContainerResponseProto> {
+
+ final String hostname;
+ final int port;
+ final RunContainerRequestProto request;
+
+ private RunContainerCallable(RunContainerRequestProto request, String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ this.request = request;
+ }
+
+ @Override
+ public RunContainerResponseProto call() throws Exception {
+ return getProxy(hostname, port).runContainer(null, request);
+ }
+ }
+
+ private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> {
+ final String hostname;
+ final int port;
+ final SubmitWorkRequestProto request;
+
+ private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ this.request = request;
+ }
+
+ @Override
+ public SubmitWorkResponseProto call() throws Exception {
+ return getProxy(hostname, port).submitWork(null, request);
+ }
+ }
+
+ public interface ExecuteRequestCallback<T extends Message> {
+ void setResponse(T response);
+ void indicateError(Throwable t);
+ }
+
+ private TezTestServiceProtocolBlockingPB getProxy(String hostname, int port) {
+ String hostId = getHostIdentifier(hostname, port);
+
+ TezTestServiceProtocolBlockingPB proxy = hostProxies.get(hostId);
+ if (proxy == null) {
+ proxy = new TezTestServiceProtocolClientImpl(getConfig(), hostname, port);
+ TezTestServiceProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+ if (proxyOld != null) {
+ // TODO Shutdown the new proxy.
+ proxy = proxyOld;
+ }
+ }
+ return proxy;
+ }
+
+ private String getHostIdentifier(String hostname, int port) {
+ return hostname + ":" + port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
new file mode 100644
index 0000000..e83165b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.service.TezTestServiceConfConstants;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+
+public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher {
+
+ // TODO Support interruptability of tasks which haven't yet been launched.
+
+ // TODO May need multiple connections per target machine, depending upon how synchronization is handled in the RPC layer
+
+ static final Log LOG = LogFactory.getLog(TezTestServiceContainerLauncher.class);
+
+ private final AppContext context;
+ private final String tokenIdentifier;
+ private final TaskAttemptListener tal;
+ private final int servicePort;
+ private final TezTestServiceCommunicator communicator;
+ private final Clock clock;
+
+
+ // Configuration passed in here to set up final parameters
+ public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf,
+ TaskAttemptListener tal) {
+ super(TezTestServiceContainerLauncher.class.getName());
+ this.clock = appContext.getClock();
+ int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
+
+ this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+ Preconditions.checkArgument(servicePort > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
+ this.communicator = new TezTestServiceCommunicator(numThreads);
+ this.context = appContext;
+ this.tokenIdentifier = context.getApplicationID().toString();
+ this.tal = tal;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ communicator.init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ communicator.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ communicator.stop();
+ }
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+ RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent);
+ communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(),
+ launchEvent.getNodeId().getPort(),
+ new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
+ @Override
+ public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
+ LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId());
+ context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+ ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+ launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+ context.getHistoryHandler().handle(new DAGHistoryEvent(
+ null, lEvt));
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t);
+ sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t);
+ }
+ });
+ break;
+ case CONTAINER_STOP_REQUEST:
+ LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+ // that the container is actually done (normally received from RM)
+ // TODO Sending this out for an un-launched container is invalid
+ context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+ AMContainerEventType.C_NM_STOP_SENT));
+ break;
+ }
+ }
+
+ private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
+ RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
+ builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort());
+ builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId());
+ builder.setApplicationIdString(
+ event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString());
+ builder.setTokenIdentifier(tokenIdentifier);
+ builder.setContainerIdString(event.getContainer().getId().toString());
+ builder.setCredentialsBinary(
+ ByteString.copyFrom(event.getContainerLaunchContext().getTokens()));
+ // TODO Avoid reading this from the environment
+ builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+ return builder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
+ context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
new file mode 100644
index 0000000..8c8e486
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher {
+
+ static final Log LOG = LogFactory.getLog(TezTestServiceNoOpContainerLauncher.class);
+
+ private final AppContext context;
+ private final Clock clock;
+
+ public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf,
+ TaskAttemptListener tal) {
+ super(TezTestServiceNoOpContainerLauncher.class.getName());
+ this.context = appContext;
+ this.clock = appContext.getClock();
+ }
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ switch(event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+ LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId());
+ context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+ ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+ launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+ context.getHistoryHandler().handle(new DAGHistoryEvent(
+ null, lEvt));
+ break;
+ case CONTAINER_STOP_REQUEST:
+ LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+ context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+ AMContainerEventType.C_NM_STOP_SENT));
+ break;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
new file mode 100644
index 0000000..e3c18bf
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.service.TezTestServiceConfConstants;
+
+
+// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes.
+
+public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
+
+ private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class);
+
+ private final ExecutorService appCallbackExecutor;
+ private final TaskSchedulerAppCallback appClientDelegate;
+ private final AppContext appContext;
+ private final List<String> serviceHosts;
+ private final ContainerFactory containerFactory;
+ private final Random random = new Random();
+ // Currently all services must be running on the same port.
+ private final int containerPort;
+
+ private final String clientHostname;
+ private final int clientPort;
+ private final String trackingUrl;
+ private final AtomicBoolean isStopped = new AtomicBoolean(false);
+ private final ConcurrentMap<Object, ContainerId> runningTasks =
+ new ConcurrentHashMap<Object, ContainerId>();
+
+ private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
+
+ // Per instance
+ private final int memoryPerInstance;
+ private final int coresPerInstance;
+ private final int executorsPerInstance;
+
+ // Per Executor Thread
+ private final Resource resourcePerContainer;
+
+
+ public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
+ AppContext appContext,
+ String clientHostname, int clientPort,
+ String trackingUrl,
+ Configuration conf) {
+ // Accepting configuration here to allow setting up fields as final
+ super(TezTestServiceTaskSchedulerService.class.getName());
+ this.appCallbackExecutor = createAppCallbackExecutorService();
+ this.appClientDelegate = createAppCallbackDelegate(appClient);
+ this.appContext = appContext;
+ this.serviceHosts = new LinkedList<String>();
+ this.containerFactory = new ContainerFactory(appContext);
+
+ this.memoryPerInstance = conf
+ .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
+ Preconditions.checkArgument(memoryPerInstance > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB +
+ " must be configured");
+
+ this.executorsPerInstance = conf.getInt(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+ -1);
+ Preconditions.checkArgument(executorsPerInstance > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE +
+ " must be configured");
+
+ this.coresPerInstance = conf
+ .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE,
+ executorsPerInstance);
+
+ this.containerPort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+ Preconditions.checkArgument(executorsPerInstance > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
+
+ this.clientHostname = clientHostname;
+ this.clientPort = clientPort;
+ this.trackingUrl = trackingUrl;
+
+ int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
+ int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
+ this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
+ this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
+
+ String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
+ if (hosts == null || hosts.length == 0) {
+ hosts = new String[]{"localhost"};
+ }
+ for (String host : hosts) {
+ serviceHosts.add(host);
+ }
+
+ LOG.info("Running with configuration: " +
+ "memoryPerInstance=" + memoryPerInstance +
+ ", vcoresPerInstance=" + coresPerInstance +
+ ", executorsPerInstance=" + executorsPerInstance +
+ ", resourcePerContainerInferred=" + resourcePerContainer +
+ ", hosts=" + serviceHosts.toString());
+
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ amRmClient.init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ amRmClient.start();
+ RegisterApplicationMasterResponse response;
+ try {
+ amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
+ } catch (YarnException e) {
+ throw new TezUncheckedException(e);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Override
+ public void serviceStop() {
+ if (!this.isStopped.getAndSet(true)) {
+
+ try {
+ TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+ amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
+ status.postCompletionTrackingUrl);
+ } catch (YarnException e) {
+ throw new TezUncheckedException(e);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ appCallbackExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ // TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
+ return Resource
+ .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+ serviceHosts.size() * coresPerInstance);
+ }
+
+ @Override
+ public int getClusterNodeCount() {
+ return serviceHosts.size();
+ }
+
+ @Override
+ public void resetMatchLocalityForAllHeldContainers() {
+ }
+
+ @Override
+ public Resource getTotalResources() {
+ return Resource
+ .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+ serviceHosts.size() * coresPerInstance);
+ }
+
+ @Override
+ public void blacklistNode(NodeId nodeId) {
+ LOG.info("DEBUG: BlacklistNode not supported");
+ }
+
+ @Override
+ public void unblacklistNode(NodeId nodeId) {
+ LOG.info("DEBUG: unBlacklistNode not supported");
+ }
+
+ @Override
+ public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+ Priority priority, Object containerSignature, Object clientCookie) {
+ String host = selectHost(hosts);
+ Container container =
+ containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+ runningTasks.put(task, container.getId());
+ appClientDelegate.taskAllocated(task, clientCookie, container);
+ }
+
+
+ @Override
+ public void allocateTask(Object task, Resource capability, ContainerId containerId,
+ Priority priority, Object containerSignature, Object clientCookie) {
+ String host = selectHost(null);
+ Container container =
+ containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+ runningTasks.put(task, container.getId());
+ appClientDelegate.taskAllocated(task, clientCookie, container);
+ }
+
+ @Override
+ public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ ContainerId containerId = runningTasks.remove(task);
+ if (containerId == null) {
+ LOG.error("Could not determine ContainerId for task: " + task +
+ " . Could have hit a race condition. Ignoring." +
+ " The query may hang since this \"unknown\" container is now taking up a slot permanently");
+ return false;
+ }
+ appClientDelegate.containerBeingReleased(containerId);
+ return true;
+ }
+
+ @Override
+ public Object deallocateContainer(ContainerId containerId) {
+ LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId);
+ return null;
+ }
+
+ @Override
+ public void setShouldUnregister() {
+
+ }
+
+ @Override
+ public boolean hasUnregistered() {
+ // Nothing to do. No registration involved.
+ return true;
+ }
+
+ private ExecutorService createAppCallbackExecutorService() {
+ return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+ }
+
+ private TaskSchedulerAppCallback createAppCallbackDelegate(
+ TaskSchedulerAppCallback realAppClient) {
+ return new TaskSchedulerAppCallbackWrapper(realAppClient,
+ appCallbackExecutor);
+ }
+
+ private String selectHost(String[] requestedHosts) {
+ String host = null;
+ if (requestedHosts != null && requestedHosts.length > 0) {
+ Arrays.sort(requestedHosts);
+ host = requestedHosts[0];
+ LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
+ } else {
+ host = serviceHosts.get(random.nextInt(serviceHosts.size()));
+ LOG.info("Selected random host: " + host + " since the request contained no host information");
+ }
+ return host;
+ }
+
+ static class ContainerFactory {
+ final AppContext appContext;
+ AtomicInteger nextId;
+
+ public ContainerFactory(AppContext appContext) {
+ this.appContext = appContext;
+ this.nextId = new AtomicInteger(2);
+ }
+
+ public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
+ ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+ NodeId nodeId = NodeId.newInstance(hostname, port);
+ String nodeHttpAddress = "hostname:0";
+
+ Container container = Container.newInstance(containerId,
+ nodeId,
+ nodeHttpAddress,
+ capability,
+ priority,
+ null);
+
+ return container;
+ }
+ }
+
+ private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+
+ }
+
+ @Override
+ public void onShutdownRequest() {
+
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
new file mode 100644
index 0000000..78cdcde
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskcomm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+import org.apache.tez.util.ProtoConverters;
+
+
+public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl {
+
+ private static final Log LOG = LogFactory.getLog(TezTestServiceTaskCommunicatorImpl.class);
+
+ private final TezTestServiceCommunicator communicator;
+ private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
+ private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+ public TezTestServiceTaskCommunicatorImpl(
+ TaskCommunicatorContext taskCommunicatorContext) {
+ super(taskCommunicatorContext);
+ // TODO Maybe make this configurable
+ this.communicator = new TezTestServiceCommunicator(3);
+
+ SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
+
+ // TODO Avoid reading this from the environment
+ baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+ baseBuilder.setApplicationIdString(
+ taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
+ baseBuilder
+ .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
+ baseBuilder.setTokenIdentifier(getTokenIdentifier());
+
+ BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+
+ credentialMap = new ConcurrentHashMap<String, ByteBuffer>();
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ this.communicator.init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ super.serviceStart();
+ this.communicator.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ super.serviceStop();
+ }
+
+
+ @Override
+ public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
+ super.registerRunningContainer(containerId, hostname, port);
+ }
+
+ @Override
+ public void registerContainerEnd(ContainerId containerId) {
+ super.registerContainerEnd(containerId);
+ }
+
+ @Override
+ public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
+ Map<String, LocalResource> additionalResources,
+ Credentials credentials,
+ boolean credentialsChanged) {
+ super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
+ credentialsChanged);
+ SubmitWorkRequestProto requestProto = null;
+ try {
+ requestProto = constructSubmitWorkRequest(containerId, taskSpec);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to construct request", e);
+ }
+ ContainerInfo containerInfo = getContainerInfo(containerId);
+ String host;
+ int port;
+ if (containerInfo != null) {
+ synchronized (containerInfo) {
+ host = containerInfo.host;
+ port = containerInfo.port;
+ }
+ } else {
+ // TODO Handle this properly
+ throw new RuntimeException("ContainerInfo not found for container: " + containerId +
+ ", while trying to launch task: " + taskSpec.getTaskAttemptID());
+ }
+ communicator.submitWork(requestProto, host, port,
+ new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
+ @Override
+ public void setResponse(SubmitWorkResponseProto response) {
+ LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
+ getTaskCommunicatorContext()
+ .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in.
+ LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+ containerId, t);
+ }
+ });
+ }
+
+ @Override
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+ super.unregisterRunningTaskAttempt(taskAttemptID);
+ // Nothing else to do for now. The push API in the test does not support termination of a running task
+ }
+
+ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
+ TaskSpec taskSpec) throws
+ IOException {
+ SubmitWorkRequestProto.Builder builder =
+ SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+ builder.setContainerIdString(containerId.toString());
+ builder.setAmHost(getAddress().getHostName());
+ builder.setAmPort(getAddress().getPort());
+ Credentials taskCredentials = new Credentials();
+ // Credentials can change across DAGs. Ideally construct only once per DAG.
+ taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+
+ ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+ if (credentialsBinary == null) {
+ credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+ credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
+ } else {
+ credentialsBinary = credentialsBinary.duplicate();
+ }
+ builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ builder.setTaskSpec(ProtoConverters.convertTaskSpecToProto(taskSpec));
+ return builder.build();
+ }
+
+ private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+ Credentials containerCredentials = new Credentials();
+ containerCredentials.addAll(credentials);
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+ ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+ containerTokens_dob.getLength());
+ return containerCredentialsBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
new file mode 100644
index 0000000..2bca4ed
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.IOException;
+
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+
+public interface ContainerRunner {
+
+ void queueContainer(RunContainerRequestProto request) throws IOException;
+ void submitWork(SubmitWorkRequestProto request) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
new file mode 100644
index 0000000..f47bd67
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.service.impl.TezTestService;
+
+public class MiniTezTestServiceCluster extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(MiniTezTestServiceCluster.class);
+
+ private final File testWorkDir;
+ private final long availableMemory;
+ private final int numExecutorsPerService;
+ private final String[] localDirs;
+ private final Configuration clusterSpecificConfiguration = new Configuration(false);
+
+ private TezTestService tezTestService;
+
+ public static MiniTezTestServiceCluster create(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+ return new MiniTezTestServiceCluster(clusterName, numExecutorsPerService, availableMemory, numLocalDirs);
+ }
+
+ // TODO Add support for multiple instances
+ private MiniTezTestServiceCluster(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+ super(clusterName + "_TezTestServerCluster");
+ Preconditions.checkArgument(numExecutorsPerService > 0);
+ Preconditions.checkArgument(availableMemory > 0);
+ Preconditions.checkArgument(numLocalDirs > 0);
+ String clusterNameTrimmed = clusterName.replace("$", "") + "_TezTestServerCluster";
+ File targetWorkDir = new File("target", clusterNameTrimmed);
+ try {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(targetWorkDir.getAbsolutePath()), true);
+ } catch (Exception e) {
+ LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e);
+ throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e);
+ }
+
+ if (Shell.WINDOWS) {
+ // The test working directory can exceed the maximum path length supported
+ // by some Windows APIs and cmd.exe (260 characters). To work around this,
+ // create a symlink in temporary storage with a much shorter path,
+ // targeting the full path to the test working directory. Then, use the
+ // symlink as the test working directory.
+ String targetPath = targetWorkDir.getAbsolutePath();
+ File link = new File(System.getProperty("java.io.tmpdir"),
+ String.valueOf(System.currentTimeMillis()));
+ String linkPath = link.getAbsolutePath();
+
+ try {
+ FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
+ } catch (IOException e) {
+ throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e);
+ }
+
+ // Guarantee target exists before creating symlink.
+ targetWorkDir.mkdirs();
+
+ Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+ Shell.getSymlinkCommand(targetPath, linkPath));
+ try {
+ shexec.execute();
+ } catch (IOException e) {
+ throw new YarnRuntimeException(String.format(
+ "failed to create symlink from %s to %s, shell output: %s", linkPath,
+ targetPath, shexec.getOutput()), e);
+ }
+
+ this.testWorkDir = link;
+ } else {
+ this.testWorkDir = targetWorkDir;
+ }
+ this.numExecutorsPerService = numExecutorsPerService;
+ this.availableMemory = availableMemory;
+
+ // Setup Local Dirs
+ localDirs = new String[numLocalDirs];
+ for (int i = 0 ; i < numLocalDirs ; i++) {
+ File f = new File(testWorkDir, "localDir");
+ f.mkdirs();
+ LOG.info("Created localDir: " + f.getAbsolutePath());
+ localDirs[i] = f.getAbsolutePath();
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ tezTestService = new TezTestService(conf, numExecutorsPerService, availableMemory, localDirs);
+ tezTestService.init(conf);
+
+ }
+
+ @Override
+ public void serviceStart() {
+ tezTestService.start();
+
+ clusterSpecificConfiguration.set(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS,
+ getServiceAddress().getHostName());
+ clusterSpecificConfiguration.setInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT,
+ getServiceAddress().getPort());
+
+ clusterSpecificConfiguration.setInt(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+ numExecutorsPerService);
+ clusterSpecificConfiguration.setLong(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, availableMemory);
+ }
+
+ @Override
+ public void serviceStop() {
+ tezTestService.stop();
+ }
+
+ /**
+ * return the address at which the service is listening
+ * @return host:port
+ */
+ public InetSocketAddress getServiceAddress() {
+ Preconditions.checkState(getServiceState() == STATE.STARTED);
+ return tezTestService.getListenerAddress();
+ }
+
+ public int getShufflePort() {
+ Preconditions.checkState(getServiceState() == STATE.STARTED);
+ return tezTestService.getShufflePort();
+ }
+
+ public Configuration getClusterSpecificConfiguration() {
+ Preconditions.checkState(getServiceState() == STATE.STARTED);
+ return clusterSpecificConfiguration;
+ }
+
+ // Mainly for verification
+ public int getNumSubmissions() {
+ return tezTestService.getNumSubmissions();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
new file mode 100644
index 0000000..bf4a5bd
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+public class TezTestServiceConfConstants {
+
+ private static final String TEZ_TEST_SERVICE_PREFIX = "tez.test.service.";
+
+ /** Number of executors per instance - used by the scheduler */
+ public static final String TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "num.executors.per-instance";
+
+ /** Memory available per instance - used by the scheduler */
+ public static final String TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB = TEZ_TEST_SERVICE_PREFIX + "memory.per.instance.mb";
+
+ /** CPUs available per instance - used by the scheduler */
+ public static final String TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "vcpus.per.instance";
+
+
+ /** Hosts on which the service is running. Currently assuming a single port for all instances */
+ public static final String TEZ_TEST_SERVICE_HOSTS = TEZ_TEST_SERVICE_PREFIX + "hosts";
+
+ /** Port on which the Service(s) listen. Current a single port for all instances */
+ public static final String TEZ_TEST_SERVICE_RPC_PORT = TEZ_TEST_SERVICE_PREFIX + "rpc.port";
+
+ /** Number of threads to use in the AM to communicate with the external service */
+ public static final String TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS = TEZ_TEST_SERVICE_PREFIX + "communicator.num.threads";
+ public static final int TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 2;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aadd0492/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
new file mode 100644
index 0000000..1108f72
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+
+@ProtocolInfo(protocolName = "org.apache.tez.service.TezTestServiceProtocolBlockingPB", protocolVersion = 1)
+public interface TezTestServiceProtocolBlockingPB extends TezTestServiceProtocolProtos.TezTestServiceProtocol.BlockingInterface {
+}
\ No newline at end of file