You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/14 02:24:34 UTC
[1/3] tez git commit: TEZ-2090. Add tests for jobs running in
external services. (sseth)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 4f560543d -> 4e8ee895e
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
new file mode 100644
index 0000000..a93c1a4
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.tests;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
+import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
+import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
+import org.apache.tez.examples.HashJoinExample;
+import org.apache.tez.examples.JoinDataGen;
+import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestExternalTezServices {
+
+ private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class);
+
+ private static MiniTezCluster tezCluster;
+ private static MiniDFSCluster dfsCluster;
+ private static MiniTezTestServiceCluster tezTestServiceCluster;
+
+ private static Configuration clusterConf = new Configuration();
+ private static Configuration confForJobs;
+
+ private static FileSystem remoteFs;
+ private static FileSystem localFs;
+
+ private static TezClient sharedTezClient;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
+ + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException, TezException, InterruptedException {
+
+ localFs = FileSystem.getLocal(clusterConf);
+
+ try {
+ clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster =
+ new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ LOG.info("MiniDFSCluster started");
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ tezCluster.init(conf);
+ tezCluster.start();
+ LOG.info("MiniTezCluster started");
+
+ clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ for (Map.Entry<String, String> entry : tezCluster.getConfig()) {
+ clusterConf.set(entry.getKey(), entry.getValue());
+ }
+ long jvmMax = Runtime.getRuntime().maxMemory();
+
+ tezTestServiceCluster = MiniTezTestServiceCluster
+ .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1);
+ tezTestServiceCluster.init(clusterConf);
+ tezTestServiceCluster.start();
+ LOG.info("MiniTezTestServer started");
+
+ confForJobs = new Configuration(clusterConf);
+ for (Map.Entry<String, String> entry : tezTestServiceCluster
+ .getClusterSpecificConfiguration()) {
+ confForJobs.set(entry.getKey(), entry.getValue());
+ }
+
+ // TODO TEZ-2003 Once per vertex configuration is possible, run separate tests for push vs pull (regular threaded execution)
+
+ Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+ // This is currently configured to push tasks into the Service, and then use the standard RPC
+ confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS,
+ TezTestServiceTaskSchedulerService.class.getName());
+ confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS,
+ TezTestServiceNoOpContainerLauncher.class.getName());
+ confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS,
+ TezTestServiceTaskCommunicatorImpl.class.getName());
+
+ TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+ sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
+ tezConf, true);
+ sharedTezClient.start();
+ LOG.info("Shared TezSession started");
+ sharedTezClient.waitTillReady();
+ LOG.info("Shared TezSession ready for submission");
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException, TezException {
+ if (sharedTezClient != null) {
+ sharedTezClient.stop();
+ sharedTezClient = null;
+ }
+
+ if (tezTestServiceCluster != null) {
+ tezTestServiceCluster.stop();
+ tezTestServiceCluster = null;
+ }
+
+ if (tezCluster != null) {
+ tezCluster.stop();
+ tezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ // TODO Add cleanup code.
+ }
+
+
+ @Test(timeout = 60000)
+ public void test1() throws Exception {
+ Path testDir = new Path("/tmp/testHashJoinExample");
+
+ remoteFs.mkdirs(testDir);
+
+ Path dataPath1 = new Path(testDir, "inPath1");
+ Path dataPath2 = new Path(testDir, "inPath2");
+ Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
+ Path outPath = new Path(testDir, "outPath");
+
+ TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+ JoinDataGen dataGen = new JoinDataGen();
+ String[] dataGenArgs = new String[]{
+ dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+ expectedOutputPath.toString(), "2"};
+ assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
+
+ HashJoinExample joinExample = new HashJoinExample();
+ String[] args = new String[]{
+ dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
+ assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+
+ JoinValidate joinValidate = new JoinValidate();
+ String[] validateArgs = new String[]{
+ expectedOutputPath.toString(), outPath.toString(), "3"};
+ assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
+
+ // Ensure this was actually submitted to the external cluster
+ assertTrue(tezTestServiceCluster.getNumSubmissions() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
new file mode 100644
index 0000000..60ebc53
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
+
+public class ProtoConverters {
+
+ public static TaskSpec getTaskSpecfromProto(TaskSpecProto taskSpecProto) {
+ TezTaskAttemptID taskAttemptID =
+ TezTaskAttemptID.fromString(taskSpecProto.getTaskAttemptIdString());
+
+ ProcessorDescriptor processorDescriptor = null;
+ if (taskSpecProto.hasProcessorDescriptor()) {
+ processorDescriptor = DagTypeConverters
+ .convertProcessorDescriptorFromDAGPlan(taskSpecProto.getProcessorDescriptor());
+ }
+
+ List<InputSpec> inputSpecList = new ArrayList<InputSpec>(taskSpecProto.getInputSpecsCount());
+ if (taskSpecProto.getInputSpecsCount() > 0) {
+ for (IOSpecProto inputSpecProto : taskSpecProto.getInputSpecsList()) {
+ inputSpecList.add(getInputSpecFromProto(inputSpecProto));
+ }
+ }
+
+ List<OutputSpec> outputSpecList =
+ new ArrayList<OutputSpec>(taskSpecProto.getOutputSpecsCount());
+ if (taskSpecProto.getOutputSpecsCount() > 0) {
+ for (IOSpecProto outputSpecProto : taskSpecProto.getOutputSpecsList()) {
+ outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
+ }
+ }
+
+ List<GroupInputSpec> groupInputSpecs =
+ new ArrayList<GroupInputSpec>(taskSpecProto.getGroupedInputSpecsCount());
+ if (taskSpecProto.getGroupedInputSpecsCount() > 0) {
+ for (GroupInputSpecProto groupInputSpecProto : taskSpecProto.getGroupedInputSpecsList()) {
+ groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
+ }
+ }
+
+ TaskSpec taskSpec =
+ new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(),
+ taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
+ outputSpecList, groupInputSpecs);
+ return taskSpec;
+ }
+
+ public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
+ TaskSpecProto.Builder builder = TaskSpecProto.newBuilder();
+ builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
+ builder.setDagName(taskSpec.getDAGName());
+ builder.setVertexName(taskSpec.getVertexName());
+ builder.setVertexParallelism(taskSpec.getVertexParallelism());
+
+ if (taskSpec.getProcessorDescriptor() != null) {
+ builder.setProcessorDescriptor(
+ DagTypeConverters.convertToDAGPlan(taskSpec.getProcessorDescriptor()));
+ }
+
+ if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) {
+ for (InputSpec inputSpec : taskSpec.getInputs()) {
+ builder.addInputSpecs(convertInputSpecToProto(inputSpec));
+ }
+ }
+
+ if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) {
+ for (OutputSpec outputSpec : taskSpec.getOutputs()) {
+ builder.addOutputSpecs(convertOutputSpecToProto(outputSpec));
+ }
+ }
+
+ if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) {
+ for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) {
+ builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec));
+
+ }
+ }
+ return builder.build();
+ }
+
+
+ public static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) {
+ InputDescriptor inputDescriptor = null;
+ if (inputSpecProto.hasIoDescriptor()) {
+ inputDescriptor =
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(inputSpecProto.getIoDescriptor());
+ }
+ InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor,
+ inputSpecProto.getPhysicalEdgeCount());
+ return inputSpec;
+ }
+
+ public static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) {
+ IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+ if (inputSpec.getSourceVertexName() != null) {
+ builder.setConnectedVertexName(inputSpec.getSourceVertexName());
+ }
+ if (inputSpec.getInputDescriptor() != null) {
+ builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(inputSpec.getInputDescriptor()));
+ }
+ builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount());
+ return builder.build();
+ }
+
+ public static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) {
+ OutputDescriptor outputDescriptor = null;
+ if (outputSpecProto.hasIoDescriptor()) {
+ outputDescriptor =
+ DagTypeConverters.convertOutputDescriptorFromDAGPlan(outputSpecProto.getIoDescriptor());
+ }
+ OutputSpec outputSpec =
+ new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor,
+ outputSpecProto.getPhysicalEdgeCount());
+ return outputSpec;
+ }
+
+ public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) {
+ IOSpecProto.Builder builder = IOSpecProto.newBuilder();
+ if (outputSpec.getDestinationVertexName() != null) {
+ builder.setConnectedVertexName(outputSpec.getDestinationVertexName());
+ }
+ if (outputSpec.getOutputDescriptor() != null) {
+ builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(outputSpec.getOutputDescriptor()));
+ }
+ builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount());
+ return builder.build();
+ }
+
+ public static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) {
+ GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(),
+ groupInputSpecProto.getGroupVerticesList(), DagTypeConverters
+ .convertInputDescriptorFromDAGPlan(groupInputSpecProto.getMergedInputDescriptor()));
+ return groupSpec;
+ }
+
+ public static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) {
+ GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder();
+ builder.setGroupName(groupInputSpec.getGroupName());
+ builder.addAllGroupVertices(groupInputSpec.getGroupVertices());
+ builder.setMergedInputDescriptor(
+ DagTypeConverters.convertToDAGPlan(groupInputSpec.getMergedInputDescriptor()));
+ return builder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
new file mode 100644
index 0000000..2f8b2e6
--- /dev/null
+++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.test.service.rpc";
+option java_outer_classname = "TezTestServiceProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "DAGApiRecords.proto";
+
+message IOSpecProto {
+ optional string connected_vertex_name = 1;
+ optional TezEntityDescriptorProto io_descriptor = 2;
+ optional int32 physical_edge_count = 3;
+}
+
+message GroupInputSpecProto {
+ optional string group_name = 1;
+ repeated string group_vertices = 2;
+ optional TezEntityDescriptorProto merged_input_descriptor = 3;
+}
+
+message TaskSpecProto {
+ optional string task_attempt_id_string = 1;
+ optional string dag_name = 2;
+ optional string vertex_name = 3;
+ optional TezEntityDescriptorProto processor_descriptor = 4;
+ repeated IOSpecProto input_specs = 5;
+ repeated IOSpecProto output_specs = 6;
+ repeated GroupInputSpecProto grouped_input_specs = 7;
+ optional int32 vertex_parallelism = 8;
+}
+
+
+message SubmitWorkRequestProto {
+ optional string container_id_string = 1;
+ optional string am_host = 2;
+ optional int32 am_port = 3;
+ optional string token_identifier = 4;
+ optional bytes credentials_binary = 5;
+ optional string user = 6;
+ optional string application_id_string = 7;
+ optional int32 app_attempt_number = 8;
+ optional TaskSpecProto task_spec = 9;
+}
+
+message SubmitWorkResponseProto {
+}
+
+
+
+message RunContainerRequestProto {
+ optional string container_id_string = 1;
+ optional string am_host = 2;
+ optional int32 am_port = 3;
+ optional string token_identifier = 4;
+ optional bytes credentials_binary = 5;
+ optional string user = 6;
+ optional string application_id_string = 7;
+ optional int32 app_attempt_number = 8;
+}
+
+message RunContainerResponseProto {
+}
+
+service TezTestServiceProtocol {
+ rpc runContainer(RunContainerRequestProto) returns (RunContainerResponseProto);
+ rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/resources/log4j.properties b/tez-ext-service-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..531b68b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 6164e52..30f80a5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -390,7 +390,7 @@ public class TezChild {
private final Throwable throwable;
private final String errorMessage;
- ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
+ public ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable,
@Nullable String errorMessage) {
this.exitStatus = exitStatus;
this.throwable = throwable;
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 6606481..34f2fc6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -67,7 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
private final AtomicBoolean taskRunning;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
- TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
+ public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs,
TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber,
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
[3/3] tez git commit: TEZ-2090. Add tests for jobs running in
external services. (sseth)
Posted by ss...@apache.org.
TEZ-2090. Add tests for jobs running in external services. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4e8ee895
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4e8ee895
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4e8ee895
Branch: refs/heads/TEZ-2003
Commit: 4e8ee895ec8f6b22b95dda662fe7f2579eaf0094
Parents: 4f56054
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 13 17:24:05 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Feb 13 17:24:05 2015 -0800
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
pom.xml | 6 +
.../apache/tez/dag/api/TezConfiguration.java | 2 +
.../apache/tez/dag/api/TaskCommunicator.java | 1 +
.../tez/dag/api/TaskCommunicatorContext.java | 3 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 39 +-
.../tez/dag/app/TezTaskCommunicatorImpl.java | 42 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 2 +-
tez-ext-service-tests/pom.xml | 161 ++++
.../tez/dag/app/TezTestServiceCommunicator.java | 152 ++++
.../TezTestServiceContainerLauncher.java | 144 ++++
.../TezTestServiceNoOpContainerLauncher.java | 66 ++
.../rm/TezTestServiceTaskSchedulerService.java | 347 ++++++++
.../TezTestServiceTaskCommunicatorImpl.java | 182 ++++
.../org/apache/tez/service/ContainerRunner.java | 27 +
.../tez/service/MiniTezTestServiceCluster.java | 163 ++++
.../service/TezTestServiceConfConstants.java | 41 +
.../TezTestServiceProtocolBlockingPB.java | 22 +
.../tez/service/impl/ContainerRunnerImpl.java | 512 +++++++++++
.../apache/tez/service/impl/TezTestService.java | 126 +++
.../impl/TezTestServiceProtocolClientImpl.java | 82 ++
.../impl/TezTestServiceProtocolServerImpl.java | 133 +++
.../tez/shufflehandler/FadvisedChunkedFile.java | 78 ++
.../tez/shufflehandler/FadvisedFileRegion.java | 160 ++++
.../apache/tez/shufflehandler/IndexCache.java | 199 +++++
.../tez/shufflehandler/ShuffleHandler.java | 840 +++++++++++++++++++
.../tez/tests/TestExternalTezServices.java | 183 ++++
.../org/apache/tez/util/ProtoConverters.java | 172 ++++
.../src/test/proto/TezDaemonProtocol.proto | 84 ++
.../src/test/resources/log4j.properties | 19 +
.../org/apache/tez/runtime/task/TezChild.java | 2 +-
.../apache/tez/runtime/task/TezTaskRunner.java | 2 +-
32 files changed, 3980 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d7e4be5..975ce65 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -1,5 +1,6 @@
ALL CHANGES:
TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
TEZ-2006. Task communication plane needs to be pluggable.
+ TEZ-2090. Add tests for jobs running in external services.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3396587..eb6bcad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-ext-service-tests</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
@@ -620,6 +625,7 @@
<module>tez-dag</module>
<module>tez-ui</module>
<module>tez-plugins</module>
+ <module>tez-ext-service-tests</module>
<module>tez-dist</module>
<module>docs</module>
</modules>
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index df233e1..3414dbd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1140,6 +1140,8 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
@ConfigurationScope(Scope.VERTEX)
public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_AM_TASK_COMMUNICATOR_CLASS = TEZ_AM_PREFIX + "task-communicator.class";
// TODO only validate property here, value can also be validated if necessary
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 97f9c16..c9f85e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.api;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 9b2d889..41675fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -44,5 +44,8 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 Move to vertex, taskIndex, version
void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
+ // TODO TEZ-2003 Add an API to register task failure - for example, a communication failure.
+ // This will have to take into consideration the TA_FAILED event
+
// TODO Eventually Add methods to report availability stats to the scheduler.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index f7dfe19..08b50ba 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.app;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,13 +28,16 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -42,6 +47,7 @@ import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.rm.TaskSchedulerService;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
@@ -58,7 +64,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
.getLog(TaskAttemptListenerImpTezDag.class);
private final AppContext context;
- private final TaskCommunicator taskCommunicator;
+ private TaskCommunicator taskCommunicator;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -93,6 +99,32 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
+ public void serviceInit(Configuration conf) {
+ String taskCommClassName = conf.get(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS);
+ if (taskCommClassName == null) {
+ LOG.info("Using Default Task Communicator");
+ this.taskCommunicator = new TezTaskCommunicatorImpl(this);
+ } else {
+ LOG.info("Using TaskCommunicator: " + taskCommClassName);
+ Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
+ .getClazz(taskCommClassName);
+ try {
+ Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
+ ctor.setAccessible(true);
+ this.taskCommunicator = ctor.newInstance(this);
+ } catch (NoSuchMethodException e) {
+ throw new TezUncheckedException(e);
+ } catch (InvocationTargetException e) {
+ throw new TezUncheckedException(e);
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+ }
+
+ @Override
public void serviceStart() {
taskCommunicator.init(getConfig());
taskCommunicator.start();
@@ -100,7 +132,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
@Override
public void serviceStop() {
- taskCommunicator.stop();
+ if (taskCommunicator != null) {
+ taskCommunicator.stop();
+ taskCommunicator = null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 5652937..258c927 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -74,16 +74,22 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
new ConcurrentHashMap<TaskAttempt, ContainerId>();
private final TezTaskUmbilicalProtocol taskUmbilical;
+ private final String tokenIdentifier;
+ private final Token<JobTokenIdentifier> sessionToken;
private InetSocketAddress address;
private Server server;
- private static final class ContainerInfo {
+ public static final class ContainerInfo {
- ContainerInfo(ContainerId containerId) {
+ ContainerInfo(ContainerId containerId, String host, int port) {
this.containerId = containerId;
+ this.host = host;
+ this.port = port;
}
- ContainerId containerId;
+ final ContainerId containerId;
+ public final String host;
+ public final int port;
TezHeartbeatResponse lastResponse = null;
TaskSpec taskSpec = null;
long lastRequestId = 0;
@@ -110,6 +116,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
super(TezTaskCommunicatorImpl.class.getName());
this.taskCommunicatorContext = taskCommunicatorContext;
this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
+ this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+ this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
}
@@ -130,9 +138,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
- Token<JobTokenIdentifier> sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
- jobTokenSecretManager.addTokenForJob(
- taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(), sessionToken);
+ jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
server = new RPC.Builder(conf)
.setProtocol(TezTaskUmbilicalProtocol.class)
@@ -182,7 +188,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
@Override
public void registerRunningContainer(ContainerId containerId, String host, int port) {
- ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId));
+ ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port));
if (oldInfo != null) {
throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
}
@@ -230,9 +236,9 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
". Already registered to containerId: " + oldId);
}
}
-
}
+
@Override
public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
@@ -258,6 +264,18 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
return address;
}
+ protected String getTokenIdentifier() {
+ return tokenIdentifier;
+ }
+
+ protected Token<JobTokenIdentifier> getSessionToken() {
+ return sessionToken;
+ }
+
+ protected TaskCommunicatorContext getTaskCommunicatorContext() {
+ return taskCommunicatorContext;
+ }
+
public TezTaskUmbilicalProtocol getUmbilical() {
return this.taskUmbilical;
}
@@ -471,4 +489,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}';
}
}
+
+ protected ContainerInfo getContainerInfo(ContainerId containerId) {
+ return registeredContainers.get(containerId);
+ }
+
+ protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) {
+ return attemptToContainerMap.get(new TaskAttempt(taskAttemptId));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 72dcd95..228871f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -350,7 +350,7 @@ public class TaskSchedulerEventHandler extends AbstractService
try {
Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
.getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
- Integer.class, String.class, Configuration.class);
+ int.class, String.class, Configuration.class);
ctor.setAccessible(true);
TaskSchedulerService taskSchedulerService =
ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
new file mode 100644
index 0000000..37f68b1
--- /dev/null
+++ b/tez-ext-service-tests/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>tez</artifactId>
+ <groupId>org.apache.tez</groupId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <!-- TODO TEZ-2003 Merge this into the tez-tests module -->
+ <artifactId>tez-ext-service-tests</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ </dependency>
+ <dependency>
+ <!-- Required for the ShuffleHandler -->
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <!--
+ Include all files in src/main/resources. By default, do not apply property
+ substitution (filtering=false), but do apply property substitution to
+ version-info.properties (filtering=true). This will substitute the
+ version information correctly, but prevent Maven from altering other files.
+ -->
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <excludes>
+ <exclude>tez-api-version-info.properties</exclude>
+ </excludes>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>tez-api-version-info.properties</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/test/proto</param>
+ <param>${basedir}/../tez-api/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/test/proto</directory>
+ <includes>
+ <include>TezDaemonProtocol.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-test-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
new file mode 100644
index 0000000..ac50878
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/TezTestServiceCommunicator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.service.impl.TezTestServiceProtocolClientImpl;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+
+public class TezTestServiceCommunicator extends AbstractService {
+
+ private final ConcurrentMap<String, TezTestServiceProtocolBlockingPB> hostProxies;
+ private final ListeningExecutorService executor;
+
+ // TODO Convert this into a singleton
+ public TezTestServiceCommunicator(int numThreads) {
+ super(TezTestServiceCommunicator.class.getSimpleName());
+ ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setNameFormat("TezTestServiceCommunicator #%2d").build());
+ this.hostProxies = new ConcurrentHashMap<String, TezTestServiceProtocolBlockingPB>();
+ executor = MoreExecutors.listeningDecorator(localExecutor);
+ }
+
+ @Override
+ public void serviceStop() {
+ executor.shutdownNow();
+ }
+
+
+ public void runContainer(RunContainerRequestProto request, String host, int port,
+ final ExecuteRequestCallback<RunContainerResponseProto> callback) {
+ ListenableFuture<RunContainerResponseProto> future = executor.submit(new RunContainerCallable(request, host, port));
+ Futures.addCallback(future, new FutureCallback<RunContainerResponseProto>() {
+ @Override
+ public void onSuccess(RunContainerResponseProto result) {
+ callback.setResponse(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.indicateError(t);
+ }
+ });
+
+ }
+
+ public void submitWork(SubmitWorkRequestProto request, String host, int port,
+ final ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
+ ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(request, host, port));
+ Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
+ @Override
+ public void onSuccess(SubmitWorkResponseProto result) {
+ callback.setResponse(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.indicateError(t);
+ }
+ });
+
+ }
+
+
+ private class RunContainerCallable implements Callable<RunContainerResponseProto> {
+
+ final String hostname;
+ final int port;
+ final RunContainerRequestProto request;
+
+ private RunContainerCallable(RunContainerRequestProto request, String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ this.request = request;
+ }
+
+ @Override
+ public RunContainerResponseProto call() throws Exception {
+ return getProxy(hostname, port).runContainer(null, request);
+ }
+ }
+
+ private class SubmitWorkCallable implements Callable<SubmitWorkResponseProto> {
+ final String hostname;
+ final int port;
+ final SubmitWorkRequestProto request;
+
+ private SubmitWorkCallable(SubmitWorkRequestProto request, String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ this.request = request;
+ }
+
+ @Override
+ public SubmitWorkResponseProto call() throws Exception {
+ return getProxy(hostname, port).submitWork(null, request);
+ }
+ }
+
+ public interface ExecuteRequestCallback<T extends Message> {
+ void setResponse(T response);
+ void indicateError(Throwable t);
+ }
+
+ private TezTestServiceProtocolBlockingPB getProxy(String hostname, int port) {
+ String hostId = getHostIdentifier(hostname, port);
+
+ TezTestServiceProtocolBlockingPB proxy = hostProxies.get(hostId);
+ if (proxy == null) {
+ proxy = new TezTestServiceProtocolClientImpl(getConfig(), hostname, port);
+ TezTestServiceProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+ if (proxyOld != null) {
+ // TODO Shutdown the new proxy.
+ proxy = proxyOld;
+ }
+ }
+ return proxy;
+ }
+
+ private String getHostIdentifier(String hostname, int port) {
+ return hostname + ":" + port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
new file mode 100644
index 0000000..e83165b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.service.TezTestServiceConfConstants;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+
+public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher {
+
+ // TODO Support interruptability of tasks which haven't yet been launched.
+
+ // TODO May need multiple connections per target machine, depending upon how synchronization is handled in the RPC layer
+
+ static final Log LOG = LogFactory.getLog(TezTestServiceContainerLauncher.class);
+
+ private final AppContext context;
+ private final String tokenIdentifier;
+ private final TaskAttemptListener tal;
+ private final int servicePort;
+ private final TezTestServiceCommunicator communicator;
+ private final Clock clock;
+
+
+ // Configuration passed in here to set up final parameters
+ public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf,
+ TaskAttemptListener tal) {
+ super(TezTestServiceContainerLauncher.class.getName());
+ this.clock = appContext.getClock();
+ int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
+
+ this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+ Preconditions.checkArgument(servicePort > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
+ this.communicator = new TezTestServiceCommunicator(numThreads);
+ this.context = appContext;
+ this.tokenIdentifier = context.getApplicationID().toString();
+ this.tal = tal;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ communicator.init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ communicator.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ communicator.stop();
+ }
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+ RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent);
+ communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(),
+ launchEvent.getNodeId().getPort(),
+ new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
+ @Override
+ public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
+ LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId());
+ context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+ ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+ launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+ context.getHistoryHandler().handle(new DAGHistoryEvent(
+ null, lEvt));
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t);
+ sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t);
+ }
+ });
+ break;
+ case CONTAINER_STOP_REQUEST:
+ LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+ // that the container is actually done (normally received from RM)
+ // TODO Sending this out for an un-launched container is invalid
+ context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+ AMContainerEventType.C_NM_STOP_SENT));
+ break;
+ }
+ }
+
+ private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
+ RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
+ builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort());
+ builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId());
+ builder.setApplicationIdString(
+ event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString());
+ builder.setTokenIdentifier(tokenIdentifier);
+ builder.setContainerIdString(event.getContainer().getId().toString());
+ builder.setCredentialsBinary(
+ ByteString.copyFrom(event.getContainerLaunchContext().getTokens()));
+ // TODO Avoid reading this from the environment
+ builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+ return builder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
+ context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
new file mode 100644
index 0000000..8c8e486
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher {
+
+ static final Log LOG = LogFactory.getLog(TezTestServiceNoOpContainerLauncher.class);
+
+ private final AppContext context;
+ private final Clock clock;
+
+ public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf,
+ TaskAttemptListener tal) {
+ super(TezTestServiceNoOpContainerLauncher.class.getName());
+ this.context = appContext;
+ this.clock = appContext.getClock();
+ }
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ switch(event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+ LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId());
+ context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
+ ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+ launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+ context.getHistoryHandler().handle(new DAGHistoryEvent(
+ null, lEvt));
+ break;
+ case CONTAINER_STOP_REQUEST:
+ LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
+ context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
+ AMContainerEventType.C_NM_STOP_SENT));
+ break;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
new file mode 100644
index 0000000..e3c18bf
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.service.TezTestServiceConfConstants;
+
+
+// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes.
+
+public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
+
+ private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class);
+
+ private final ExecutorService appCallbackExecutor;
+ private final TaskSchedulerAppCallback appClientDelegate;
+ private final AppContext appContext;
+ private final List<String> serviceHosts;
+ private final ContainerFactory containerFactory;
+ private final Random random = new Random();
+ // Currently all services must be running on the same port.
+ private final int containerPort;
+
+ private final String clientHostname;
+ private final int clientPort;
+ private final String trackingUrl;
+ private final AtomicBoolean isStopped = new AtomicBoolean(false);
+ private final ConcurrentMap<Object, ContainerId> runningTasks =
+ new ConcurrentHashMap<Object, ContainerId>();
+
+ private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
+
+ // Per instance
+ private final int memoryPerInstance;
+ private final int coresPerInstance;
+ private final int executorsPerInstance;
+
+ // Per Executor Thread
+ private final Resource resourcePerContainer;
+
+
+ public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
+ AppContext appContext,
+ String clientHostname, int clientPort,
+ String trackingUrl,
+ Configuration conf) {
+ // Accepting configuration here to allow setting up fields as final
+ super(TezTestServiceTaskSchedulerService.class.getName());
+ this.appCallbackExecutor = createAppCallbackExecutorService();
+ this.appClientDelegate = createAppCallbackDelegate(appClient);
+ this.appContext = appContext;
+ this.serviceHosts = new LinkedList<String>();
+ this.containerFactory = new ContainerFactory(appContext);
+
+ this.memoryPerInstance = conf
+ .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
+ Preconditions.checkArgument(memoryPerInstance > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB +
+ " must be configured");
+
+ this.executorsPerInstance = conf.getInt(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+ -1);
+ Preconditions.checkArgument(executorsPerInstance > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE +
+ " must be configured");
+
+ this.coresPerInstance = conf
+ .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE,
+ executorsPerInstance);
+
+ this.containerPort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+ Preconditions.checkArgument(executorsPerInstance > 0,
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
+
+ this.clientHostname = clientHostname;
+ this.clientPort = clientPort;
+ this.trackingUrl = trackingUrl;
+
+ int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
+ int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
+ this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
+ this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
+
+ String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
+ if (hosts == null || hosts.length == 0) {
+ hosts = new String[]{"localhost"};
+ }
+ for (String host : hosts) {
+ serviceHosts.add(host);
+ }
+
+ LOG.info("Running with configuration: " +
+ "memoryPerInstance=" + memoryPerInstance +
+ ", vcoresPerInstance=" + coresPerInstance +
+ ", executorsPerInstance=" + executorsPerInstance +
+ ", resourcePerContainerInferred=" + resourcePerContainer +
+ ", hosts=" + serviceHosts.toString());
+
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ amRmClient.init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ amRmClient.start();
+ RegisterApplicationMasterResponse response;
+ try {
+ amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
+ } catch (YarnException e) {
+ throw new TezUncheckedException(e);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Override
+ public void serviceStop() {
+ if (!this.isStopped.getAndSet(true)) {
+
+ try {
+ TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+ amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
+ status.postCompletionTrackingUrl);
+ } catch (YarnException e) {
+ throw new TezUncheckedException(e);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ appCallbackExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ // TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
+ return Resource
+ .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+ serviceHosts.size() * coresPerInstance);
+ }
+
+ @Override
+ public int getClusterNodeCount() {
+ return serviceHosts.size();
+ }
+
+ @Override
+ public void resetMatchLocalityForAllHeldContainers() {
+ }
+
+ @Override
+ public Resource getTotalResources() {
+ return Resource
+ .newInstance(Ints.checkedCast(serviceHosts.size() * memoryPerInstance),
+ serviceHosts.size() * coresPerInstance);
+ }
+
+ @Override
+ public void blacklistNode(NodeId nodeId) {
+ LOG.info("DEBUG: BlacklistNode not supported");
+ }
+
+ @Override
+ public void unblacklistNode(NodeId nodeId) {
+ LOG.info("DEBUG: unBlacklistNode not supported");
+ }
+
+ @Override
+ public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+ Priority priority, Object containerSignature, Object clientCookie) {
+ String host = selectHost(hosts);
+ Container container =
+ containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+ runningTasks.put(task, container.getId());
+ appClientDelegate.taskAllocated(task, clientCookie, container);
+ }
+
+
+ @Override
+ public void allocateTask(Object task, Resource capability, ContainerId containerId,
+ Priority priority, Object containerSignature, Object clientCookie) {
+ String host = selectHost(null);
+ Container container =
+ containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
+ runningTasks.put(task, container.getId());
+ appClientDelegate.taskAllocated(task, clientCookie, container);
+ }
+
+ @Override
+ public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ ContainerId containerId = runningTasks.remove(task);
+ if (containerId == null) {
+ LOG.error("Could not determine ContainerId for task: " + task +
+ " . Could have hit a race condition. Ignoring." +
+ " The query may hang since this \"unknown\" container is now taking up a slot permanently");
+ return false;
+ }
+ appClientDelegate.containerBeingReleased(containerId);
+ return true;
+ }
+
+ @Override
+ public Object deallocateContainer(ContainerId containerId) {
+ LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId);
+ return null;
+ }
+
+ @Override
+ public void setShouldUnregister() {
+
+ }
+
+ @Override
+ public boolean hasUnregistered() {
+ // Nothing to do. No registration involved.
+ return true;
+ }
+
+ private ExecutorService createAppCallbackExecutorService() {
+ return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+ }
+
+ private TaskSchedulerAppCallback createAppCallbackDelegate(
+ TaskSchedulerAppCallback realAppClient) {
+ return new TaskSchedulerAppCallbackWrapper(realAppClient,
+ appCallbackExecutor);
+ }
+
+ private String selectHost(String[] requestedHosts) {
+ String host = null;
+ if (requestedHosts != null && requestedHosts.length > 0) {
+ Arrays.sort(requestedHosts);
+ host = requestedHosts[0];
+ LOG.info("Selected host: " + host + " from requested hosts: " + Arrays.toString(requestedHosts));
+ } else {
+ host = serviceHosts.get(random.nextInt(serviceHosts.size()));
+ LOG.info("Selected random host: " + host + " since the request contained no host information");
+ }
+ return host;
+ }
+
+ static class ContainerFactory {
+ final AppContext appContext;
+ AtomicInteger nextId;
+
+ public ContainerFactory(AppContext appContext) {
+ this.appContext = appContext;
+ this.nextId = new AtomicInteger(2);
+ }
+
+ public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
+ ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+ NodeId nodeId = NodeId.newInstance(hostname, port);
+ String nodeHttpAddress = "hostname:0";
+
+ Container container = Container.newInstance(containerId,
+ nodeId,
+ nodeHttpAddress,
+ capability,
+ priority,
+ null);
+
+ return container;
+ }
+ }
+
+ private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+
+ }
+
+ @Override
+ public void onShutdownRequest() {
+
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
new file mode 100644
index 0000000..78cdcde
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskcomm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
+import org.apache.tez.dag.app.TezTestServiceCommunicator;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+import org.apache.tez.util.ProtoConverters;
+
+
+public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl {
+
+ private static final Log LOG = LogFactory.getLog(TezTestServiceTaskCommunicatorImpl.class);
+
+ private final TezTestServiceCommunicator communicator;
+ private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
+ private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+ public TezTestServiceTaskCommunicatorImpl(
+ TaskCommunicatorContext taskCommunicatorContext) {
+ super(taskCommunicatorContext);
+ // TODO Maybe make this configurable
+ this.communicator = new TezTestServiceCommunicator(3);
+
+ SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
+
+ // TODO Avoid reading this from the environment
+ baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+ baseBuilder.setApplicationIdString(
+ taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
+ baseBuilder
+ .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
+ baseBuilder.setTokenIdentifier(getTokenIdentifier());
+
+ BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
+
+ credentialMap = new ConcurrentHashMap<String, ByteBuffer>();
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ this.communicator.init(conf);
+ }
+
+ @Override
+ public void serviceStart() {
+ super.serviceStart();
+ this.communicator.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ super.serviceStop();
+ }
+
+
+ @Override
+ public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
+ super.registerRunningContainer(containerId, hostname, port);
+ }
+
+ @Override
+ public void registerContainerEnd(ContainerId containerId) {
+ super.registerContainerEnd(containerId);
+ }
+
+ @Override
+ public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
+ Map<String, LocalResource> additionalResources,
+ Credentials credentials,
+ boolean credentialsChanged) {
+ super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
+ credentialsChanged);
+ SubmitWorkRequestProto requestProto = null;
+ try {
+ requestProto = constructSubmitWorkRequest(containerId, taskSpec);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to construct request", e);
+ }
+ ContainerInfo containerInfo = getContainerInfo(containerId);
+ String host;
+ int port;
+ if (containerInfo != null) {
+ synchronized (containerInfo) {
+ host = containerInfo.host;
+ port = containerInfo.port;
+ }
+ } else {
+ // TODO Handle this properly
+ throw new RuntimeException("ContainerInfo not found for container: " + containerId +
+ ", while trying to launch task: " + taskSpec.getTaskAttemptID());
+ }
+ communicator.submitWork(requestProto, host, port,
+ new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
+ @Override
+ public void setResponse(SubmitWorkResponseProto response) {
+ LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
+ getTaskCommunicatorContext()
+ .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ // TODO Handle this error. This is where an API on the context to indicate failure / rejection comes in.
+ LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+ containerId, t);
+ }
+ });
+ }
+
+ @Override
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+ super.unregisterRunningTaskAttempt(taskAttemptID);
+ // Nothing else to do for now. The push API in the test does not support termination of a running task
+ }
+
+ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
+ TaskSpec taskSpec) throws
+ IOException {
+ SubmitWorkRequestProto.Builder builder =
+ SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST);
+ builder.setContainerIdString(containerId.toString());
+ builder.setAmHost(getAddress().getHostName());
+ builder.setAmPort(getAddress().getPort());
+ Credentials taskCredentials = new Credentials();
+ // Credentials can change across DAGs. Ideally construct only once per DAG.
+ taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+
+ ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+ if (credentialsBinary == null) {
+ credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+ credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
+ } else {
+ credentialsBinary = credentialsBinary.duplicate();
+ }
+ builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ builder.setTaskSpec(ProtoConverters.convertTaskSpecToProto(taskSpec));
+ return builder.build();
+ }
+
+ private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+ Credentials containerCredentials = new Credentials();
+ containerCredentials.addAll(credentials);
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+ ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+ containerTokens_dob.getLength());
+ return containerCredentialsBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
new file mode 100644
index 0000000..2bca4ed
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/ContainerRunner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.IOException;
+
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+
+public interface ContainerRunner {
+
+ void queueContainer(RunContainerRequestProto request) throws IOException;
+ void submitWork(SubmitWorkRequestProto request) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
new file mode 100644
index 0000000..f47bd67
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/MiniTezTestServiceCluster.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.service.impl.TezTestService;
+
+public class MiniTezTestServiceCluster extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(MiniTezTestServiceCluster.class);
+
+ private final File testWorkDir;
+ private final long availableMemory;
+ private final int numExecutorsPerService;
+ private final String[] localDirs;
+ private final Configuration clusterSpecificConfiguration = new Configuration(false);
+
+ private TezTestService tezTestService;
+
+ public static MiniTezTestServiceCluster create(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+ return new MiniTezTestServiceCluster(clusterName, numExecutorsPerService, availableMemory, numLocalDirs);
+ }
+
+ // TODO Add support for multiple instances
+ private MiniTezTestServiceCluster(String clusterName, int numExecutorsPerService, long availableMemory, int numLocalDirs) {
+ super(clusterName + "_TezTestServerCluster");
+ Preconditions.checkArgument(numExecutorsPerService > 0);
+ Preconditions.checkArgument(availableMemory > 0);
+ Preconditions.checkArgument(numLocalDirs > 0);
+ String clusterNameTrimmed = clusterName.replace("$", "") + "_TezTestServerCluster";
+ File targetWorkDir = new File("target", clusterNameTrimmed);
+ try {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(targetWorkDir.getAbsolutePath()), true);
+ } catch (Exception e) {
+ LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e);
+ throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e);
+ }
+
+ if (Shell.WINDOWS) {
+ // The test working directory can exceed the maximum path length supported
+ // by some Windows APIs and cmd.exe (260 characters). To work around this,
+ // create a symlink in temporary storage with a much shorter path,
+ // targeting the full path to the test working directory. Then, use the
+ // symlink as the test working directory.
+ String targetPath = targetWorkDir.getAbsolutePath();
+ File link = new File(System.getProperty("java.io.tmpdir"),
+ String.valueOf(System.currentTimeMillis()));
+ String linkPath = link.getAbsolutePath();
+
+ try {
+ FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
+ } catch (IOException e) {
+ throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e);
+ }
+
+ // Guarantee target exists before creating symlink.
+ targetWorkDir.mkdirs();
+
+ Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+ Shell.getSymlinkCommand(targetPath, linkPath));
+ try {
+ shexec.execute();
+ } catch (IOException e) {
+ throw new YarnRuntimeException(String.format(
+ "failed to create symlink from %s to %s, shell output: %s", linkPath,
+ targetPath, shexec.getOutput()), e);
+ }
+
+ this.testWorkDir = link;
+ } else {
+ this.testWorkDir = targetWorkDir;
+ }
+ this.numExecutorsPerService = numExecutorsPerService;
+ this.availableMemory = availableMemory;
+
+ // Setup Local Dirs
+ localDirs = new String[numLocalDirs];
+ for (int i = 0 ; i < numLocalDirs ; i++) {
+ File f = new File(testWorkDir, "localDir");
+ f.mkdirs();
+ LOG.info("Created localDir: " + f.getAbsolutePath());
+ localDirs[i] = f.getAbsolutePath();
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ tezTestService = new TezTestService(conf, numExecutorsPerService, availableMemory, localDirs);
+ tezTestService.init(conf);
+
+ }
+
+ @Override
+ public void serviceStart() {
+ tezTestService.start();
+
+ clusterSpecificConfiguration.set(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS,
+ getServiceAddress().getHostName());
+ clusterSpecificConfiguration.setInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT,
+ getServiceAddress().getPort());
+
+ clusterSpecificConfiguration.setInt(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE,
+ numExecutorsPerService);
+ clusterSpecificConfiguration.setLong(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, availableMemory);
+ }
+
+ @Override
+ public void serviceStop() {
+ tezTestService.stop();
+ }
+
+ /**
+ * return the address at which the service is listening
+ * @return host:port
+ */
+ public InetSocketAddress getServiceAddress() {
+ Preconditions.checkState(getServiceState() == STATE.STARTED);
+ return tezTestService.getListenerAddress();
+ }
+
+ public int getShufflePort() {
+ Preconditions.checkState(getServiceState() == STATE.STARTED);
+ return tezTestService.getShufflePort();
+ }
+
+ public Configuration getClusterSpecificConfiguration() {
+ Preconditions.checkState(getServiceState() == STATE.STARTED);
+ return clusterSpecificConfiguration;
+ }
+
+ // Mainly for verification
+ public int getNumSubmissions() {
+ return tezTestService.getNumSubmissions();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
new file mode 100644
index 0000000..bf4a5bd
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+public class TezTestServiceConfConstants {
+
+ private static final String TEZ_TEST_SERVICE_PREFIX = "tez.test.service.";
+
+ /** Number of executors per instance - used by the scheduler */
+ public static final String TEZ_TEST_SERVICE_NUM_EXECUTORS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "num.executors.per-instance";
+
+ /** Memory available per instance - used by the scheduler */
+ public static final String TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB = TEZ_TEST_SERVICE_PREFIX + "memory.per.instance.mb";
+
+ /** CPUs available per instance - used by the scheduler */
+ public static final String TEZ_TEST_SERVICE_VCPUS_PER_INSTANCE = TEZ_TEST_SERVICE_PREFIX + "vcpus.per.instance";
+
+
+ /** Hosts on which the service is running. Currently assuming a single port for all instances */
+ public static final String TEZ_TEST_SERVICE_HOSTS = TEZ_TEST_SERVICE_PREFIX + "hosts";
+
+ /** Port on which the Service(s) listen. Current a single port for all instances */
+ public static final String TEZ_TEST_SERVICE_RPC_PORT = TEZ_TEST_SERVICE_PREFIX + "rpc.port";
+
+ /** Number of threads to use in the AM to communicate with the external service */
+ public static final String TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS = TEZ_TEST_SERVICE_PREFIX + "communicator.num.threads";
+ public static final int TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 2;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
new file mode 100644
index 0000000..1108f72
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceProtocolBlockingPB.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+
+@ProtocolInfo(protocolName = "org.apache.tez.service.TezTestServiceProtocolBlockingPB", protocolVersion = 1)
+public interface TezTestServiceProtocolBlockingPB extends TezTestServiceProtocolProtos.TezTestServiceProtocol.BlockingInterface {
+}
\ No newline at end of file
[2/3] tez git commit: TEZ-2090. Add tests for jobs running in
external services. (sseth)
Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
new file mode 100644
index 0000000..4a6ce33
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.task.TaskReporter;
+import org.apache.tez.runtime.task.TezTaskRunner;
+import org.apache.tez.service.ContainerRunner;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.task.TezChild;
+import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
+import org.apache.tez.shufflehandler.ShuffleHandler;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkRequestProto;
+import org.apache.tez.util.ProtoConverters;
+
+public class ContainerRunnerImpl extends AbstractService implements ContainerRunner {
+
+ private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
+
+ private final ListeningExecutorService executorService;
+ private final AtomicReference<InetSocketAddress> localAddress;
+ private final String[] localDirsBase;
+ private final Map<String, String> localEnv = new HashMap<String, String>();
+ private volatile FileSystem localFs;
+ private final long memoryPerExecutor;
+ // TODO Support for removing queued containers, interrupting / killing specific containers - when preemption is supported
+
+
+
+
+ public ContainerRunnerImpl(int numExecutors, String[] localDirsBase,
+ AtomicReference<InetSocketAddress> localAddress,
+ long totalMemoryAvailableBytes) {
+ super("ContainerRunnerImpl");
+ Preconditions.checkState(numExecutors > 0,
+ "Invalid number of executors: " + numExecutors + ". Must be > 0");
+ this.localDirsBase = localDirsBase;
+ this.localAddress = localAddress;
+
+ ExecutorService raw = Executors.newFixedThreadPool(numExecutors,
+ new ThreadFactoryBuilder().setNameFormat("ContainerExecutor %d").build());
+ this.executorService = MoreExecutors.listeningDecorator(raw);
+
+
+ // 80% of memory considered for accounted buffers. Rest for objects.
+ // TODO Tune this based on the available size.
+ this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) numExecutors);
+
+ LOG.info("ContainerRunnerImpl config: " +
+ "memoryPerExecutorDerived=" + memoryPerExecutor +
+ ", numExecutors=" + numExecutors
+ );
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to setup local filesystem instance", e);
+ }
+ }
+
+ @Override
+ public void serviceStart() {
+ }
+
+ public void setShufflePort(int shufflePort) {
+ AuxiliaryServiceHelper.setServiceDataIntoEnv(
+ TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ ByteBuffer.allocate(4).putInt(shufflePort), localEnv);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ // TODO Move this into a utilities class
+ private static String createAppSpecificLocalDir(String baseDir, String applicationIdString,
+ String user) {
+ return baseDir + File.separator + "usercache" + File.separator + user + File.separator +
+ "appcache" + File.separator + applicationIdString;
+ }
+
+ /**
+ * Submit a container which is ready for running.
+ * The regular pull mechanism will be used to fetch work from the AM
+ * @param request
+ * @throws IOException
+ */
+ @Override
+ public void queueContainer(RunContainerRequestProto request) throws IOException {
+ LOG.info("Queuing container for execution: " + request);
+
+ Map<String, String> env = new HashMap<String, String>();
+ env.putAll(localEnv);
+ env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
+
+ String[] localDirs = new String[localDirsBase.length];
+
+ // Setup up local dirs to be application specific, and create them.
+ for (int i = 0; i < localDirsBase.length; i++) {
+ localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
+ request.getUser());
+ localFs.mkdirs(new Path(localDirs[i]));
+ }
+ LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs));
+
+
+ // Setup workingDir. This is otherwise setup as Environment.PWD
+ // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
+ String workingDir = localDirs[0];
+
+ Credentials credentials = new Credentials();
+ DataInputBuffer dib = new DataInputBuffer();
+ byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
+ dib.reset(tokenBytes, tokenBytes.length);
+ credentials.readTokenStorageStream(dib);
+
+ Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+
+ // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
+ LOG.info("DEBUG: Registering request with the ShuffleHandler");
+ ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
+
+
+ ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()),
+ new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
+ workingDir, credentials, memoryPerExecutor);
+ ListenableFuture<ContainerExecutionResult> future = executorService
+ .submit(callable);
+ Futures.addCallback(future, new ContainerRunnerCallback(request, callable));
+ }
+
+ /**
+ * Submit an entire work unit - containerId + TaskSpec.
+ * This is intended for a task push from the AM
+ *
+ * @param request
+ * @throws IOException
+ */
+ @Override
+ public void submitWork(SubmitWorkRequestProto request) throws
+ IOException {
+ LOG.info("Queuing work for execution: " + request);
+
+ Map<String, String> env = new HashMap<String, String>();
+ env.putAll(localEnv);
+ env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
+
+ String[] localDirs = new String[localDirsBase.length];
+
+ // Setup up local dirs to be application specific, and create them.
+ for (int i = 0; i < localDirsBase.length; i++) {
+ localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], request.getApplicationIdString(),
+ request.getUser());
+ localFs.mkdirs(new Path(localDirs[i]));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dirs are: " + Arrays.toString(localDirs));
+ }
+
+ // Setup workingDir. This is otherwise setup as Environment.PWD
+ // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
+ String workingDir = localDirs[0];
+
+ Credentials credentials = new Credentials();
+ DataInputBuffer dib = new DataInputBuffer();
+ byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
+ dib.reset(tokenBytes, tokenBytes.length);
+ credentials.readTokenStorageStream(dib);
+
+ Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+
+ // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
+ LOG.info("DEBUG: Registering request with the ShuffleHandler");
+ ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
+ TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
+ new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
+ workingDir, credentials, memoryPerExecutor);
+ ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable);
+ Futures.addCallback(future, new TaskRunnerCallback(request, callable));
+ }
+
+
+ static class ContainerRunnerCallable implements Callable<ContainerExecutionResult> {
+
+ private final RunContainerRequestProto request;
+ private final Configuration conf;
+ private final String workingDir;
+ private final String[] localDirs;
+ private final Map<String, String> envMap;
+ private final String pid = null;
+ private final ObjectRegistryImpl objectRegistry;
+ private final ExecutionContext executionContext;
+ private final Credentials credentials;
+ private final long memoryAvailable;
+ private volatile TezChild tezChild;
+
+
+ ContainerRunnerCallable(RunContainerRequestProto request, Configuration conf,
+ ExecutionContext executionContext, Map<String, String> envMap,
+ String[] localDirs, String workingDir, Credentials credentials,
+ long memoryAvailable) {
+ this.request = request;
+ this.conf = conf;
+ this.executionContext = executionContext;
+ this.envMap = envMap;
+ this.workingDir = workingDir;
+ this.localDirs = localDirs;
+ this.objectRegistry = new ObjectRegistryImpl();
+ this.credentials = credentials;
+ this.memoryAvailable = memoryAvailable;
+
+ }
+
+ @Override
+ public ContainerExecutionResult call() throws Exception {
+ Stopwatch sw = new Stopwatch().start();
+ tezChild =
+ new TezChild(conf, request.getAmHost(), request.getAmPort(),
+ request.getContainerIdString(),
+ request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
+ envMap, objectRegistry, pid,
+ executionContext, credentials, memoryAvailable, request.getUser());
+ ContainerExecutionResult result = tezChild.run();
+ LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+ sw.stop().elapsedMillis());
+ return result;
+ }
+
+ public TezChild getTezChild() {
+ return this.tezChild;
+ }
+ }
+
+
+ final class ContainerRunnerCallback implements FutureCallback<ContainerExecutionResult> {
+
+ private final RunContainerRequestProto request;
+ private final ContainerRunnerCallable containerRunnerCallable;
+
+ ContainerRunnerCallback(RunContainerRequestProto request,
+ ContainerRunnerCallable containerRunnerCallable) {
+ this.request = request;
+ this.containerRunnerCallable = containerRunnerCallable;
+ }
+
+ // TODO Proper error handling
+ @Override
+ public void onSuccess(ContainerExecutionResult result) {
+ switch (result.getExitStatus()) {
+ case SUCCESS:
+ LOG.info("Successfully finished: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString());
+ break;
+ case EXECUTION_FAILURE:
+ LOG.info("Failed to run: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString(), result.getThrowable());
+ break;
+ case INTERRUPTED:
+ LOG.info(
+ "Interrupted while running: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString(), result.getThrowable());
+ break;
+ case ASKED_TO_DIE:
+ LOG.info(
+ "Asked to die while running: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString());
+ break;
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error(
+ "TezChild execution failed for : " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString(), t);
+ TezChild tezChild = containerRunnerCallable.getTezChild();
+ if (tezChild != null) {
+ tezChild.shutdown();
+ }
+ }
+ }
+
+ static class TaskRunnerCallable implements Callable<ContainerExecutionResult> {
+
+ private final SubmitWorkRequestProto request;
+ private final Configuration conf;
+ private final String workingDir;
+ private final String[] localDirs;
+ private final Map<String, String> envMap;
+ private final String pid = null;
+ private final ObjectRegistryImpl objectRegistry;
+ private final ExecutionContext executionContext;
+ private final Credentials credentials;
+ private final long memoryAvailable;
+ private final ListeningExecutorService executor;
+ private volatile TezTaskRunner taskRunner;
+ private volatile TaskReporter taskReporter;
+ private TezTaskUmbilicalProtocol umbilical;
+
+
+ TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf,
+ ExecutionContext executionContext, Map<String, String> envMap,
+ String[] localDirs, String workingDir, Credentials credentials,
+ long memoryAvailable) {
+ this.request = request;
+ this.conf = conf;
+ this.executionContext = executionContext;
+ this.envMap = envMap;
+ this.workingDir = workingDir;
+ this.localDirs = localDirs;
+ this.objectRegistry = new ObjectRegistryImpl();
+ this.credentials = credentials;
+ this.memoryAvailable = memoryAvailable;
+ // TODO This executor seems unnecessary. Here and TezChild
+ ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("TezTaskRunner_" + request.getTaskSpec().getTaskAttemptIdString()).build());
+ executor = MoreExecutors.listeningDecorator(executorReal);
+ }
+
+ @Override
+ public ContainerExecutionResult call() throws Exception {
+
+ // TODO Consolidate this code with TezChild.
+ Stopwatch sw = new Stopwatch().start();
+ UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser());
+ taskUgi.addCredentials(credentials);
+
+ Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+ Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ TezCommonUtils.convertJobTokenToBytes(jobToken));
+ Multimap<String, String> startedInputsMap = HashMultimap.create();
+
+ UserGroupInformation taskOwner =
+ UserGroupInformation.createRemoteUser(request.getTokenIdentifier());
+ final InetSocketAddress address =
+ NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
+ SecurityUtil.setTokenService(jobToken, address);
+ taskOwner.addToken(jobToken);
+ umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+ @Override
+ public TezTaskUmbilicalProtocol run() throws Exception {
+ return RPC.getProxy(TezTaskUmbilicalProtocol.class,
+ TezTaskUmbilicalProtocol.versionID, address, conf);
+ }
+ });
+ // TODO Stop reading this on each request.
+ taskReporter = new TaskReporter(
+ umbilical,
+ conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT),
+ conf.getLong(
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT),
+ conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT),
+ new AtomicLong(0),
+ request.getContainerIdString());
+
+ taskRunner = new TezTaskRunner(conf, taskUgi, localDirs,
+ ProtoConverters.getTaskSpecfromProto(request.getTaskSpec()), umbilical,
+ request.getAppAttemptNumber(),
+ serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
+ pid,
+ executionContext, memoryAvailable);
+
+ boolean shouldDie;
+ try {
+ shouldDie = !taskRunner.run();
+ if (shouldDie) {
+ LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+ "Asked to die by the AM");
+ }
+ } catch (IOException e) {
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+ e, "TaskExecutionFailure: " + e.getMessage());
+ } catch (TezException e) {
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
+ e, "TaskExecutionFailure: " + e.getMessage());
+ } finally {
+ FileSystem.closeAllForUGI(taskUgi);
+ }
+ LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+ sw.stop().elapsedMillis());
+ return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
+ null);
+ }
+
+ public void shutdown() {
+ executor.shutdownNow();
+ if (taskReporter != null) {
+ taskReporter.shutdown();
+ }
+ if (umbilical != null) {
+ RPC.stopProxy(umbilical);
+ }
+ }
+ }
+
+
+ final class TaskRunnerCallback implements FutureCallback<ContainerExecutionResult> {
+
+ private final SubmitWorkRequestProto request;
+ private final TaskRunnerCallable taskRunnerCallable;
+
+ TaskRunnerCallback(SubmitWorkRequestProto request,
+ TaskRunnerCallable containerRunnerCallable) {
+ this.request = request;
+ this.taskRunnerCallable = containerRunnerCallable;
+ }
+
+ // TODO Proper error handling
+ @Override
+ public void onSuccess(ContainerExecutionResult result) {
+ switch (result.getExitStatus()) {
+ case SUCCESS:
+ LOG.info("Successfully finished: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString());
+ break;
+ case EXECUTION_FAILURE:
+ LOG.info("Failed to run: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString(), result.getThrowable());
+ break;
+ case INTERRUPTED:
+ LOG.info(
+ "Interrupted while running: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString(), result.getThrowable());
+ break;
+ case ASKED_TO_DIE:
+ LOG.info(
+ "Asked to die while running: " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString());
+ break;
+ }
+ taskRunnerCallable.shutdown();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error(
+ "TezTaskRunner execution failed for : " + request.getApplicationIdString() + ", containerId=" +
+ request.getContainerIdString(), t);
+ taskRunnerCallable.shutdown();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
new file mode 100644
index 0000000..012e352
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.tez.service.ContainerRunner;
+import org.apache.tez.shufflehandler.ShuffleHandler;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+
+public class TezTestService extends AbstractService implements ContainerRunner {
+
+ private static final Logger LOG = Logger.getLogger(TezTestService.class);
+
+ private final Configuration shuffleHandlerConf;
+ private final int numExecutors;
+
+ private final TezTestServiceProtocolServerImpl server;
+ private final ContainerRunnerImpl containerRunner;
+ private final String[] localDirs;
+
+ private final AtomicInteger numSubmissions = new AtomicInteger(0);
+
+
+ private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>();
+
+ public TezTestService(Configuration conf, int numExecutors, long memoryAvailable, String[] localDirs) {
+ super(TezTestService.class.getSimpleName());
+ this.numExecutors = numExecutors;
+ this.localDirs = localDirs;
+
+ long memoryAvailableBytes = memoryAvailable;
+ long jvmMax = Runtime.getRuntime().maxMemory();
+
+ LOG.info(TezTestService.class.getSimpleName() + " created with the following configuration: " +
+ "numExecutors=" + numExecutors +
+ ", workDirs=" + Arrays.toString(localDirs) +
+ ", memoryAvailable=" + memoryAvailable +
+ ", jvmMaxMemory=" + jvmMax);
+
+ Preconditions.checkArgument(this.numExecutors > 0);
+ Preconditions.checkArgument(this.localDirs != null && this.localDirs.length > 0,
+ "Work dirs must be specified");
+ Preconditions.checkState(jvmMax >= memoryAvailableBytes,
+ "Invalid configuration. Xmx value too small. maxAvailable=" + jvmMax + ", configured=" +
+ memoryAvailableBytes);
+
+ this.shuffleHandlerConf = new Configuration(conf);
+ // Start Shuffle on a random port
+ this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs));
+
+ this.server = new TezTestServiceProtocolServerImpl(this, address);
+ this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, address,
+ memoryAvailableBytes);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ server.init(conf);
+ containerRunner.init(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ ShuffleHandler.initializeAndStart(shuffleHandlerConf);
+ containerRunner.setShufflePort(ShuffleHandler.get().getPort());
+ server.start();
+ containerRunner.start();
+ }
+
+ public void serviceStop() throws Exception {
+ containerRunner.stop();
+ server.stop();
+ ShuffleHandler.get().stop();
+ }
+
+ public InetSocketAddress getListenerAddress() {
+ return server.getBindAddress();
+ }
+
+ public int getShufflePort() {
+ return ShuffleHandler.get().getPort();
+ }
+
+
+
+ @Override
+ public void queueContainer(RunContainerRequestProto request) throws IOException {
+ numSubmissions.incrementAndGet();
+ containerRunner.queueContainer(request);
+ }
+
+ @Override
+ public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+ IOException {
+ numSubmissions.incrementAndGet();
+ containerRunner.submitWork(request);
+ }
+
+ public int getNumSubmissions() {
+ return numSubmissions.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java
new file mode 100644
index 0000000..10d2952
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+
+
+public class TezTestServiceProtocolClientImpl implements TezTestServiceProtocolBlockingPB {
+
+ private final Configuration conf;
+ private final InetSocketAddress serverAddr;
+ TezTestServiceProtocolBlockingPB proxy;
+
+
+ public TezTestServiceProtocolClientImpl(Configuration conf, String hostname, int port) {
+ this.conf = conf;
+ this.serverAddr = NetUtils.createSocketAddr(hostname, port);
+ }
+
+ @Override
+ public RunContainerResponseProto runContainer(RpcController controller,
+ RunContainerRequestProto request) throws
+ ServiceException {
+ try {
+ return getProxy().runContainer(null, request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public TezTestServiceProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller,
+ TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+ ServiceException {
+ try {
+ return getProxy().submitWork(null, request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+
+ public TezTestServiceProtocolBlockingPB getProxy() throws IOException {
+ if (proxy == null) {
+ proxy = createProxy();
+ }
+ return proxy;
+ }
+
+ public TezTestServiceProtocolBlockingPB createProxy() throws IOException {
+ TezTestServiceProtocolBlockingPB p;
+ // TODO Fix security
+ RPC.setProtocolEngine(conf, TezTestServiceProtocolBlockingPB.class, ProtobufRpcEngine.class);
+ p = (TezTestServiceProtocolBlockingPB) RPC
+ .getProxy(TezTestServiceProtocolBlockingPB.class, 0, serverAddr, conf);
+ return p;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
new file mode 100644
index 0000000..d7f8444
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestServiceProtocolServerImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.service.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.service.ContainerRunner;
+import org.apache.tez.service.TezTestServiceProtocolBlockingPB;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerResponseProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.SubmitWorkResponseProto;
+
+public class TezTestServiceProtocolServerImpl extends AbstractService
+ implements TezTestServiceProtocolBlockingPB {
+
+ private static final Log LOG = LogFactory.getLog(TezTestServiceProtocolServerImpl.class);
+
+ private final ContainerRunner containerRunner;
+ private RPC.Server server;
+ private final AtomicReference<InetSocketAddress> bindAddress;
+
+
+ public TezTestServiceProtocolServerImpl(ContainerRunner containerRunner,
+ AtomicReference<InetSocketAddress> address) {
+ super(TezTestServiceProtocolServerImpl.class.getSimpleName());
+ this.containerRunner = containerRunner;
+ this.bindAddress = address;
+ }
+
+ @Override
+ public RunContainerResponseProto runContainer(RpcController controller,
+ RunContainerRequestProto request) throws
+ ServiceException {
+ LOG.info("Received request: " + request);
+ try {
+ containerRunner.queueContainer(request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return RunContainerResponseProto.getDefaultInstance();
+ }
+
+ @Override
+ public SubmitWorkResponseProto submitWork(RpcController controller, TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws
+ ServiceException {
+ LOG.info("Received submitWork request: " + request);
+ try {
+ containerRunner.submitWork(request);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return SubmitWorkResponseProto.getDefaultInstance();
+ }
+
+
+ @Override
+ public void serviceStart() {
+ Configuration conf = getConfig();
+
+ int numHandlers = 3;
+ InetSocketAddress addr = new InetSocketAddress(0);
+
+ try {
+ server = createServer(TezTestServiceProtocolBlockingPB.class, addr, conf, numHandlers,
+ TezTestServiceProtocolProtos.TezTestServiceProtocol.newReflectiveBlockingService(this));
+ server.start();
+ } catch (IOException e) {
+ LOG.error("Failed to run RPC Server", e);
+ throw new RuntimeException(e);
+ }
+
+ InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
+ this.bindAddress.set(NetUtils.createSocketAddrForHost(
+ serverBindAddress.getAddress().getCanonicalHostName(),
+ serverBindAddress.getPort()));
+ LOG.info("Instantiated TestTestServiceListener at " + bindAddress);
+ }
+
+ @Override
+ public void serviceStop() {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ InetSocketAddress getBindAddress() {
+ return this.bindAddress.get();
+ }
+
+ private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
+ int numHandlers, BlockingService blockingService) throws
+ IOException {
+ RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
+ RPC.Server server = new RPC.Builder(conf)
+ .setProtocol(pbProtocol)
+ .setInstance(blockingService)
+ .setBindAddress(addr.getHostName())
+ .setPort(0)
+ .setNumHandlers(numHandlers)
+ .build();
+ // TODO Add security.
+ return server;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
new file mode 100644
index 0000000..65588fe
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedChunkedFile.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.shufflehandler;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+
+ private ReadaheadRequest readaheadRequest;
+
+ public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+ int chunkSize, boolean manageOsCache, int readaheadLength,
+ ReadaheadPool readaheadPool, String identifier) throws IOException {
+ super(file, position, count, chunkSize);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Object nextChunk() throws Exception {
+ if (manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool
+ .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+ getEndOffset(), readaheadRequest);
+ }
+ return super.nextChunk();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ try {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+ fd,
+ getStartOffset(), getEndOffset() - getStartOffset(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
new file mode 100644
index 0000000..bdffe52
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/FadvisedFileRegion.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.shufflehandler;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final FileChannel fileChannel;
+
+ private ReadaheadRequest readaheadRequest;
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
+ super(file.getChannel(), position, count);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position)
+ throws IOException {
+ if (manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+ getPosition() + position, readaheadLength,
+ getPosition() + getCount(), readaheadRequest);
+ }
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
+ }
+
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
+ }
+
+
+ @Override
+ public void releaseExternalResources() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ super.releaseExternalResources();
+ }
+
+ /**
+ * Call when the transfer completes successfully so we can advise the OS that
+ * we don't need the region to be cached anymore.
+ */
+ public void transferSuccessful() {
+ if (manageOsCache && getCount() > 0) {
+ try {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+ fd, getPosition(), getCount(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
new file mode 100644
index 0000000..9a51ca0
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/IndexCache.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.shufflehandler;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+
+class IndexCache {
+
+ private final Configuration conf;
+ private final int totalMemoryAllowed;
+ private AtomicInteger totalMemoryUsed = new AtomicInteger();
+ private static final Log LOG = LogFactory.getLog(IndexCache.class);
+
+ private final ConcurrentHashMap<String,IndexInformation> cache =
+ new ConcurrentHashMap<String,IndexInformation>();
+
+ private final LinkedBlockingQueue<String> queue =
+ new LinkedBlockingQueue<String>();
+
+ public IndexCache(Configuration conf) {
+ this.conf = conf;
+ totalMemoryAllowed = 10 * 1024 * 1024;
+ LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
+ }
+
+ /**
+ * This method gets the index information for the given mapId and reduce.
+ * It reads the index file into cache if it is not already present.
+ * @param mapId
+ * @param reduce
+ * @param fileName The file to read the index information from if it is not
+ * already present in the cache
+ * @param expectedIndexOwner The expected owner of the index file
+ * @return The Index Information
+ * @throws IOException
+ */
+ public TezIndexRecord getIndexInformation(String mapId, int reduce,
+ Path fileName, String expectedIndexOwner)
+ throws IOException {
+
+ IndexInformation info = cache.get(mapId);
+
+ if (info == null) {
+ info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
+ } else {
+ synchronized(info) {
+ while (isUnderConstruction(info)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
+ }
+ }
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+ }
+
+ if (info.mapSpillRecord.size() == 0 ||
+ info.mapSpillRecord.size() <= reduce) {
+ throw new IOException("Invalid request " +
+ " Map Id = " + mapId + " Reducer = " + reduce +
+ " Index Info Length = " + info.mapSpillRecord.size());
+ }
+ return info.mapSpillRecord.getIndex(reduce);
+ }
+
+ private boolean isUnderConstruction(IndexInformation info) {
+ synchronized(info) {
+ return (null == info.mapSpillRecord);
+ }
+ }
+
+ private IndexInformation readIndexFileToCache(Path indexFileName,
+ String mapId,
+ String expectedIndexOwner)
+ throws IOException {
+ IndexInformation info;
+ IndexInformation newInd = new IndexInformation();
+ if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
+ synchronized(info) {
+ while (isUnderConstruction(info)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
+ }
+ }
+ LOG.debug("IndexCache HIT: MapId " + mapId + " found");
+ return info;
+ }
+ LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+ TezSpillRecord tmp = null;
+ try {
+ tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
+ } catch (Throwable e) {
+ tmp = new TezSpillRecord(0);
+ cache.remove(mapId);
+ throw new IOException("Error Reading IndexFile", e);
+ } finally {
+ synchronized (newInd) {
+ newInd.mapSpillRecord = tmp;
+ newInd.notifyAll();
+ }
+ }
+ queue.add(mapId);
+
+ if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
+ freeIndexInformation();
+ }
+ return newInd;
+ }
+
+ /**
+ * This method removes the map from the cache if index information for this
+ * map is loaded(size>0), index information entry in cache will not be
+ * removed if it is in the loading phrase(size=0), this prevents corruption
+ * of totalMemoryUsed. It should be called when a map output on this tracker
+ * is discarded.
+ * @param mapId The taskID of this map.
+ */
+ public void removeMap(String mapId) {
+ IndexInformation info = cache.get(mapId);
+ if (info == null || ((info != null) && isUnderConstruction(info))) {
+ return;
+ }
+ info = cache.remove(mapId);
+ if (info != null) {
+ totalMemoryUsed.addAndGet(-info.getSize());
+ if (!queue.remove(mapId)) {
+ LOG.warn("Map ID" + mapId + " not found in queue!!");
+ }
+ } else {
+ LOG.info("Map ID " + mapId + " not found in cache");
+ }
+ }
+
+ /**
+ * This method checks if cache and totolMemoryUsed is consistent.
+ * It is only used for unit test.
+ * @return True if cache and totolMemoryUsed is consistent
+ */
+ boolean checkTotalMemoryUsed() {
+ int totalSize = 0;
+ for (IndexInformation info : cache.values()) {
+ totalSize += info.getSize();
+ }
+ return totalSize == totalMemoryUsed.get();
+ }
+
+ /**
+ * Bring memory usage below totalMemoryAllowed.
+ */
+ private synchronized void freeIndexInformation() {
+ while (totalMemoryUsed.get() > totalMemoryAllowed) {
+ String s = queue.remove();
+ IndexInformation info = cache.remove(s);
+ if (info != null) {
+ totalMemoryUsed.addAndGet(-info.getSize());
+ }
+ }
+ }
+
+ private static class IndexInformation {
+ TezSpillRecord mapSpillRecord;
+
+ int getSize() {
+ return mapSpillRecord == null
+ ? 0
+ : mapSpillRecord.size() * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e8ee895/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
new file mode 100644
index 0000000..cc82d74
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -0,0 +1,840 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.shufflehandler;
+
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import javax.crypto.SecretKey;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+public class ShuffleHandler {
+
+ private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+
+ public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "tez.shuffle.handler.local-dirs";
+
+ public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+ public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+ public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+ public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+ // pattern to identify errors related to the client closing the socket early
+ // idea borrowed from Netty SslHandler
+ private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
+ "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
+ Pattern.CASE_INSENSITIVE);
+
+ private int port;
+ private final ChannelFactory selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup();
+ protected HttpPipelineFactory pipelineFact;
+ private final int sslFileBufferSize;
+ private final Configuration conf;
+
+ private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
+
+ /**
+ * Should the shuffle use posix_fadvise calls to manage the OS cache during
+ * sendfile
+ */
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final int maxShuffleConnections;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+ private Map<String,String> userRsrc;
+ private JobTokenSecretManager secretManager;
+
+ // TODO Fix this for tez.
+ public static final String MAPREDUCE_SHUFFLE_SERVICEID =
+ "mapreduce_shuffle";
+
+ public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port";
+ public static final int DEFAULT_SHUFFLE_PORT = 15551;
+
+ // TODO Change configs to remove mapreduce references.
+ public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED =
+ "mapreduce.shuffle.connection-keep-alive.enable";
+ public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
+
+ public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT =
+ "mapreduce.shuffle.connection-keep-alive.timeout";
+ public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; //seconds
+
+ public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+ "mapreduce.shuffle.mapoutput-info.meta.cache.size";
+ public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+ 1000;
+
+ public static final String CONNECTION_CLOSE = "close";
+
+ public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+ "mapreduce.shuffle.ssl.file.buffer.size";
+
+ public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+ public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
+ public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
+
+ public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
+ // 0 implies Netty default of 2 * number of available processors
+ public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+
+ public static final String SHUFFLE_BUFFER_SIZE =
+ "mapreduce.shuffle.transfer.buffer.size";
+ public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+ public static final String SHUFFLE_TRANSFERTO_ALLOWED =
+ "mapreduce.shuffle.transferTo.allowed";
+ public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
+ public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
+ false;
+
+ final boolean connectionKeepAliveEnabled;
+ final int connectionKeepAliveTimeOut;
+ final int mapOutputMetaInfoCacheSize;
+ private static final AtomicBoolean started = new AtomicBoolean(false);
+ private static final AtomicBoolean initing = new AtomicBoolean(false);
+ private static ShuffleHandler INSTANCE;
+
+ @Metrics(about="Shuffle output metrics", context="mapred")
+ static class ShuffleMetrics implements ChannelFutureListener {
+ @Metric("Shuffle output in bytes")
+ MutableCounterLong shuffleOutputBytes;
+ @Metric("# of failed shuffle outputs")
+ MutableCounterInt shuffleOutputsFailed;
+ @Metric("# of succeeeded shuffle outputs")
+ MutableCounterInt shuffleOutputsOK;
+ @Metric("# of current shuffle connections")
+ MutableGaugeInt shuffleConnections;
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ shuffleOutputsOK.incr();
+ } else {
+ shuffleOutputsFailed.incr();
+ }
+ shuffleConnections.decr();
+ }
+ }
+
+ public ShuffleHandler(Configuration conf) {
+ this.conf = conf;
+ manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+ DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+ readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+ DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+ maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
+ DEFAULT_MAX_SHUFFLE_CONNECTIONS);
+ int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
+ DEFAULT_MAX_SHUFFLE_THREADS);
+ if (maxShuffleThreads == 0) {
+ maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
+ }
+
+ shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
+ DEFAULT_SHUFFLE_BUFFER_SIZE);
+
+ shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+ (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+ DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
+
+ ThreadFactory bossFactory = new ThreadFactoryBuilder()
+ .setNameFormat("ShuffleHandler Netty Boss #%d")
+ .build();
+ ThreadFactory workerFactory = new ThreadFactoryBuilder()
+ .setNameFormat("ShuffleHandler Netty Worker #%d")
+ .build();
+
+ selector = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory),
+ maxShuffleThreads);
+
+ sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+ connectionKeepAliveEnabled =
+ conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
+ DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
+ connectionKeepAliveTimeOut =
+ Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
+ DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
+ mapOutputMetaInfoCacheSize =
+ Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
+ DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
+
+ userRsrc = new ConcurrentHashMap<String,String>();
+ secretManager = new JobTokenSecretManager();
+ }
+
+
+ public void start() throws Exception {
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ try {
+ pipelineFact = new HttpPipelineFactory(conf);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ bootstrap.setPipelineFactory(pipelineFact);
+ port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
+ Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ accepted.add(ch);
+ port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
+ pipelineFact.SHUFFLE.setPort(port);
+ LOG.info("TezShuffleHandler" + " listening on port " + port);
+ }
+
+ public static void initializeAndStart(Configuration conf) throws Exception {
+ if (!initing.getAndSet(true)) {
+ INSTANCE = new ShuffleHandler(conf);
+ INSTANCE.start();
+ started.set(true);
+ }
+ }
+
+ public static ShuffleHandler get() {
+ Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking started");
+ return INSTANCE;
+ }
+
+ /**
+ * Serialize the shuffle port into a ByteBuffer for use later on.
+ * @param port the port to be sent to the ApplciationMaster
+ * @return the serialized form of the port.
+ */
+ public static ByteBuffer serializeMetaData(int port) throws IOException {
+ //TODO these bytes should be versioned
+ DataOutputBuffer port_dob = new DataOutputBuffer();
+ port_dob.writeInt(port);
+ return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+ }
+
+ /**
+ * A helper function to deserialize the metadata returned by ShuffleHandler.
+ * @param meta the metadata returned by the ShuffleHandler
+ * @return the port the Shuffle Handler is listening on to serve shuffle data.
+ */
+ public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+ //TODO this should be returning a class not just an int
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(meta);
+ int port = in.readInt();
+ return port;
+ }
+
+ /**
+ * A helper function to serialize the JobTokenIdentifier to be sent to the
+ * ShuffleHandler as ServiceData.
+ * @param jobToken the job token to be used for authentication of
+ * shuffle data requests.
+ * @return the serialized version of the jobToken.
+ */
+ public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
+ //TODO these bytes should be versioned
+ DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+ jobToken.write(jobToken_dob);
+ return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+ }
+
+ static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(secret);
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+ jt.readFields(in);
+ return jt;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void registerApplication(String applicationIdString, Token<JobTokenIdentifier> appToken,
+ String user) {
+ Boolean registered = registeredApps.putIfAbsent(applicationIdString, Boolean.valueOf(true));
+ if (registered == null) {
+ recordJobShuffleInfo(applicationIdString, user, appToken);
+ }
+ }
+
+ public void unregisterApplication(String applicationIdString) {
+ removeJobShuffleInfo(applicationIdString);
+ }
+
+
+ public void stop() throws Exception {
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ if (selector != null) {
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap.releaseExternalResources();
+ }
+ if (pipelineFact != null) {
+ pipelineFact.destroy();
+ }
+ }
+
+ protected Shuffle getShuffle(Configuration conf) {
+ return new Shuffle(conf);
+ }
+
+
+ private void addJobToken(String appIdString, String user,
+ Token<JobTokenIdentifier> jobToken) {
+ String jobIdString = appIdString.replace("application", "job");
+ userRsrc.put(jobIdString, user);
+ secretManager.addTokenForJob(jobIdString, jobToken);
+ LOG.info("Added token for " + jobIdString);
+ }
+
+ private void recordJobShuffleInfo(String appIdString, String user,
+ Token<JobTokenIdentifier> jobToken) {
+ addJobToken(appIdString, user, jobToken);
+ }
+
+ private void removeJobShuffleInfo(String appIdString) {
+ secretManager.removeTokenForJob(appIdString);
+ userRsrc.remove(appIdString);
+ }
+
+ class HttpPipelineFactory implements ChannelPipelineFactory {
+
+ final Shuffle SHUFFLE;
+ private SSLFactory sslFactory;
+
+ public HttpPipelineFactory(Configuration conf) throws Exception {
+ SHUFFLE = getShuffle(conf);
+ // TODO Setup SSL Shuffle
+// if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+// MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+// LOG.info("Encrypted shuffle is enabled.");
+// sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+// sslFactory.init();
+// }
+ }
+
+ public void destroy() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", SHUFFLE);
+ return pipeline;
+ // TODO factor security manager into pipeline
+ // TODO factor out encode/decode to permit binary shuffle
+ // TODO factor out decode of index to permit alt. models
+ }
+
+ }
+
+ class Shuffle extends SimpleChannelUpstreamHandler {
+
+ private final Configuration conf;
+ private final IndexCache indexCache;
+ private final LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS);
+ private int port;
+
+ public Shuffle(Configuration conf) {
+ this.conf = conf;
+ indexCache = new IndexCache(conf);
+ this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ throws Exception {
+ if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+ LOG.info(String.format("Current number of shuffle connections (%d) is " +
+ "greater than or equal to the max allowed shuffle connections (%d)",
+ accepted.size(), maxShuffleConnections));
+ evt.getChannel().close();
+ return;
+ }
+ accepted.add(evt.getChannel());
+ super.channelOpen(ctx, evt);
+
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ throws Exception {
+ HttpRequest request = (HttpRequest) evt.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+ // Check whether the shuffle version is compatible
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+ request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+ request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ }
+ final Map<String,List<String>> q =
+ new QueryStringDecoder(request.getUri()).getParameters();
+ final List<String> keepAliveList = q.get("keepAlive");
+ boolean keepAliveParam = false;
+ if (keepAliveList != null && keepAliveList.size() == 1) {
+ keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("KeepAliveParam : " + keepAliveList
+ + " : " + keepAliveParam);
+ }
+ }
+ final List<String> mapIds = splitMaps(q.get("map"));
+ final List<String> reduceQ = q.get("reduce");
+ final List<String> jobQ = q.get("job");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RECV: " + request.getUri() +
+ "\n mapId: " + mapIds +
+ "\n reduceId: " + reduceQ +
+ "\n jobId: " + jobQ +
+ "\n keepAlive: " + keepAliveParam);
+ }
+
+ if (mapIds == null || reduceQ == null || jobQ == null) {
+ sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+ return;
+ }
+ if (reduceQ.size() != 1 || jobQ.size() != 1) {
+ sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+ return;
+ }
+ int reduceId;
+ String jobId;
+ try {
+ reduceId = Integer.parseInt(reduceQ.get(0));
+ jobId = jobQ.get(0);
+ } catch (NumberFormatException e) {
+ sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+ return;
+ } catch (IllegalArgumentException e) {
+ sendError(ctx, "Bad job parameter", BAD_REQUEST);
+ return;
+ }
+ final String reqUri = request.getUri();
+ if (null == reqUri) {
+ // TODO? add upstream?
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ try {
+ verifyRequest(jobId, ctx, request, response,
+ new URL("http", "", this.port, reqUri));
+ } catch (IOException e) {
+ LOG.warn("Shuffle failure ", e);
+ sendError(ctx, e.getMessage(), UNAUTHORIZED);
+ return;
+ }
+
+ Map<String, MapOutputInfo> mapOutputInfoMap =
+ new HashMap<String, MapOutputInfo>();
+ Channel ch = evt.getChannel();
+ String user = userRsrc.get(jobId);
+
+ // $x/$user/appcache/$appId/output/$mapId
+ // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
+ // between App and Job
+ String outputBasePathStr = getBaseLocation(jobId, user);
+
+ try {
+ populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
+ response, keepAliveParam, mapOutputInfoMap);
+ } catch(IOException e) {
+ ch.write(response);
+ LOG.error("Shuffle error in populating headers :", e);
+ String errorMessage = getErrorMessage(e);
+ sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+ return;
+ }
+ ch.write(response);
+ // TODO refactor the following into the pipeline
+ ChannelFuture lastMap = null;
+ for (String mapId : mapIds) {
+ try {
+ MapOutputInfo info = mapOutputInfoMap.get(mapId);
+ if (info == null) {
+ info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+ }
+ lastMap =
+ sendMapOutput(ctx, ch, user, mapId,
+ reduceId, info);
+ if (null == lastMap) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ } catch (IOException e) {
+ LOG.error("Shuffle error :", e);
+ String errorMessage = getErrorMessage(e);
+ sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+ return;
+ }
+ }
+ lastMap.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private String getErrorMessage(Throwable t) {
+ StringBuffer sb = new StringBuffer(t.getMessage());
+ while (t.getCause() != null) {
+ sb.append(t.getCause().getMessage());
+ t = t.getCause();
+ }
+ return sb.toString();
+ }
+
+ private final String USERCACHE_CONSTANT = "usercache";
+ private final String APPCACHE_CONSTANT = "appcache";
+
+ private String getBaseLocation(String jobIdString, String user) {
+ String parts[] = jobIdString.split("_");
+ Preconditions.checkArgument(parts.length == 3, "Invalid jobId. Expecting 3 parts");
+ final ApplicationId appID =
+ ApplicationId.newInstance(Long.parseLong(parts[1]), Integer.parseInt(parts[2]));
+ final String baseStr =
+ USERCACHE_CONSTANT + "/" + user + "/"
+ + APPCACHE_CONSTANT + "/"
+ + ConverterUtils.toString(appID) + "/output" + "/";
+ return baseStr;
+ }
+
+ protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+ int reduce, String user) throws IOException {
+ // Index file
+ Path indexFileName =
+ lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+ TezIndexRecord info =
+ indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+
+ Path mapOutputFileName =
+ lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
+ }
+ MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
+ return outputInfo;
+ }
+
+ protected void populateHeaders(List<String> mapIds, String outputBaseStr,
+ String user, int reduce, HttpRequest request, HttpResponse response,
+ boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
+ throws IOException {
+
+ long contentLength = 0;
+ for (String mapId : mapIds) {
+ String base = outputBaseStr + mapId;
+ MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
+ if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
+ mapOutputInfoMap.put(mapId, outputInfo);
+ }
+ // Index file
+ Path indexFileName =
+ lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+ TezIndexRecord info =
+ indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+ ShuffleHeader header =
+ new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+
+ contentLength += info.getPartLength();
+ contentLength += dob.getLength();
+ }
+
+ // Now set the response headers.
+ setResponseHeaders(response, keepAliveParam, contentLength);
+ }
+
+ protected void setResponseHeaders(HttpResponse response,
+ boolean keepAliveParam, long contentLength) {
+ if (!connectionKeepAliveEnabled && !keepAliveParam) {
+ LOG.info("Setting connection close header...");
+ response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
+ } else {
+ response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
+ String.valueOf(contentLength));
+ response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
+ + connectionKeepAliveTimeOut);
+ LOG.info("Content Length in shuffle : " + contentLength);
+ }
+ }
+
+ class MapOutputInfo {
+ final Path mapOutputFileName;
+ final TezIndexRecord indexRecord;
+
+ MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord) {
+ this.mapOutputFileName = mapOutputFileName;
+ this.indexRecord = indexRecord;
+ }
+ }
+
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
+ if (null == tokenSecret) {
+ LOG.info("Request for unknown token " + appid);
+ throw new IOException("could not find jobid");
+ }
+ // string to encrypt
+ String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
+ // hash from the fetcher
+ String urlHashStr =
+ request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ if (urlHashStr == null) {
+ LOG.info("Missing header hash for " + appid);
+ throw new IOException("fetcher cannot be authenticated");
+ }
+ if (LOG.isDebugEnabled()) {
+ int len = urlHashStr.length();
+ LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
+ urlHashStr.substring(len-len/2, len-1));
+ }
+ // verify - throws exception
+ SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
+ // verification passed - encode the reply
+ String reply =
+ SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
+ tokenSecret);
+ response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+ // Put shuffle version into http header
+ response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ if (LOG.isDebugEnabled()) {
+ int len = reply.length();
+ LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
+ reply.substring(len-len/2, len-1));
+ }
+ }
+
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
+ String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
+ throws IOException {
+ final TezIndexRecord info = mapOutputInfo.indexRecord;
+ final ShuffleHeader header =
+ new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
+ final DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ final File spillfile =
+ new File(mapOutputInfo.mapOutputFileName.toString());
+ RandomAccessFile spill;
+ try {
+ spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
+ } catch (FileNotFoundException e) {
+ LOG.info(spillfile + " not found");
+ return null;
+ }
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) == null) {
+ final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+ info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength,
+ readaheadPool, spillfile.getAbsolutePath(),
+ shuffleBufferSize, shuffleTransferToAllowed);
+ writeFuture = ch.write(partition);
+ writeFuture.addListener(new ChannelFutureListener() {
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ partition.transferSuccessful();
+ }
+ partition.releaseExternalResources();
+ }
+ });
+ } else {
+ // HTTPS cannot be done with zero copy.
+ final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+ info.getStartOffset(), info.getPartLength(), sslFileBufferSize,
+ manageOsCache, readaheadLength, readaheadPool,
+ spillfile.getAbsolutePath());
+ writeFuture = ch.write(chunk);
+ }
+ return writeFuture;
+ }
+
+ protected void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ sendError(ctx, "", status);
+ }
+
+ protected void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ // Put shuffle version into http header
+ response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ response.setContent(
+ ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ } else if (cause instanceof IOException) {
+ if (cause instanceof ClosedChannelException) {
+ LOG.debug("Ignoring closed channel error", cause);
+ return;
+ }
+ String message = String.valueOf(cause.getMessage());
+ if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+ LOG.debug("Ignoring client socket close", cause);
+ return;
+ }
+ }
+
+ LOG.error("Shuffle error: ", cause);
+ if (ch.isConnected()) {
+ LOG.error("Shuffle error " + e);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+ }
+}