You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [35/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TezTestUtils {
+
+ public static TezTaskAttemptID getMockTaskAttemptId(
+ int jobId, int taskId, int taskAttemptId, MRTaskType type) {
+ TezTaskAttemptID taskAttemptID = mock(TezTaskAttemptID.class);
+ TezTaskID taskID = getMockTaskId(jobId, taskId, type);
+ when(taskAttemptID.getTaskID()).thenReturn(taskID);
+ when(taskAttemptID.getId()).thenReturn(taskAttemptId);
+ when(taskAttemptID.toString()).thenReturn(
+ "attempt_tez_" + Integer.toString(jobId) + "_" +
+ ((type == MRTaskType.MAP) ? "m" : "r") + "_" +
+ Integer.toString(taskId) + "_" + Integer.toString(taskAttemptId)
+ );
+ return taskAttemptID;
+ }
+
+ public static TezTaskID getMockTaskId(int jobId, int taskId, MRTaskType type) {
+ TezVertexID vertexID = getMockVertexId(jobId, type);
+ TezTaskID taskID = mock(TezTaskID.class);
+ when(taskID.getVertexID()).thenReturn(vertexID);
+ when(taskID.getId()).thenReturn(taskId);
+ return taskID;
+ }
+
+ public static TezDAGID getMockJobId(int jobId) {
+ TezDAGID jobID = mock(TezDAGID.class);
+ ApplicationId appId = Records.newRecord(ApplicationId.class);
+ appId.setClusterTimestamp(0L);
+ appId.setId(jobId);
+ when(jobID.getId()).thenReturn(jobId);
+ when(jobID.getApplicationId()).thenReturn(appId);
+ return jobID;
+ }
+
+ public static TezVertexID getMockVertexId(int jobId, MRTaskType type) {
+ TezVertexID vertexID = mock(TezVertexID.class);
+ when(vertexID.getDAGId()).thenReturn(getMockJobId(jobId));
+ when(vertexID.getId()).thenReturn(type == MRTaskType.MAP ? 0 : 1);
+ return vertexID;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class MapUtils {
+
+ private static final Log LOG = LogFactory.getLog(MapUtils.class);
+
+ private static InputSplit
+ createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file)
+ throws IOException {
+ FileInputFormat.setInputPaths(job, workDir);
+
+
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, job, file,
+ LongWritable.class, Text.class);
+ try {
+ Random r = new Random(System.currentTimeMillis());
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ for (int i = 10; i > 0; i--) {
+ key.set(r.nextInt(1000));
+ value.set(Integer.toString(i));
+ writer.append(key, value);
+ LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+ }
+ } finally {
+ writer.close();
+ }
+
+ SequenceFileInputFormat<LongWritable, Text> format =
+ new SequenceFileInputFormat<LongWritable, Text>();
+ InputSplit[] splits = format.getSplits(job, 1);
+ System.err.println("#split = " + splits.length + " ; " +
+ "#locs = " + splits[0].getLocations().length + "; " +
+ "loc = " + splits[0].getLocations()[0] + "; " +
+ "off = " + splits[0].getLength() + "; " +
+ "file = " + ((FileSplit)splits[0]).getPath());
+ return splits[0];
+ }
+
+ public static Task runMapProcessor(FileSystem fs, Path workDir,
+ JobConf jobConf,
+ int mapId, Path mapInput, AbstractModule taskModule,
+ TezTaskUmbilicalProtocol umbilical)
+ throws Exception {
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+ TezEngineTask taskContext =
+ new TezEngineTask(
+ TezTestUtils.getMockTaskAttemptId(0, mapId, 0, MRTaskType.MAP), "tez",
+ "tez", "TODO_vertexName", InitialTaskWithLocalSort.class.getName(),
+ null, null);
+
+ Injector injector = Guice.createInjector(taskModule);
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(jobConf, umbilical);
+ SimpleInput real = ((SimpleInput)t.getInput());
+ SimpleInput in = spy(real);
+ doReturn(split).when(in).getOldSplitDetails(any(TaskSplitIndex.class));
+ t.getProcessor().process(in, t.getOutput());
+ return t;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.map;
+
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.mapreduce.task.InitialTaskWithInMemSort;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.TruncatedChannelBuffer;
+import org.jboss.netty.handler.stream.ChunkedStream;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestMapProcessor {
+
+ private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);
+
+ JobConf job;
+
+ private static JobConf defaultConf = new JobConf();
+ private static FileSystem localFs = null;
+ static {
+ try {
+ defaultConf.set("fs.defaultFS", "file:///");
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestMapProcessor").makeQualified(localFs);
+
+ TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+
+ @Before
+ public void setUp() {
+ job = new JobConf(defaultConf);
+ job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+ job.setClass(
+ Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezLocalTaskOutputFiles.class,
+ TezTaskOutput.class);
+ job.setNumReduceTasks(1);
+ mapOutputs.setConf(job);
+ }
+
+ @Test
+ public void testMapProcessor() throws Exception {
+ localFs.delete(workDir, true);
+ MapUtils.runMapProcessor(
+ localFs, workDir, job, 0, new Path(workDir, "map0"),
+ new InitialTaskWithLocalSort(), new TestUmbilicalProtocol()).close();
+
+ Path mapOutputFile = mapOutputs.getInputFile(0);
+ LOG.info("mapOutputFile = " + mapOutputFile);
+ IFile.Reader reader =
+ new IFile.Reader(job, localFs, mapOutputFile, null, null);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ DataInputBuffer keyBuf = new DataInputBuffer();
+ DataInputBuffer valueBuf = new DataInputBuffer();
+ long prev = Long.MIN_VALUE;
+ while (reader.nextRawKey(keyBuf)) {
+ reader.nextRawValue(valueBuf);
+ key.readFields(keyBuf);
+ value.readFields(valueBuf);
+ if (prev != Long.MIN_VALUE) {
+ assert(prev <= key.get());
+ prev = key.get();
+ }
+ LOG.info("key = " + key.get() + "; value = " + value);
+ }
+ reader.close();
+ }
+
+ @Test
+ public void testMapProcessorWithInMemSort() throws Exception {
+ final int partitions = 2;
+ job.setNumReduceTasks(partitions);
+ job.setInt(TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, partitions);
+
+ localFs.delete(workDir, true);
+ Task t =
+ MapUtils.runMapProcessor(
+ localFs, workDir, job, 0, new Path(workDir, "map0"),
+ new InitialTaskWithInMemSort(), new TestUmbilicalProtocol(true));
+ InMemorySortedOutput output = (InMemorySortedOutput)t.getOutput();
+
+ verifyInMemSortedStream(output, 0, 4096);
+ int i = 0;
+ for (i = 2; i < 256; i <<= 1) {
+ verifyInMemSortedStream(output, 0, i);
+ }
+ verifyInMemSortedStream(output, 1, 4096);
+ for (i = 2; i < 256; i <<= 1) {
+ verifyInMemSortedStream(output, 1, i);
+ }
+
+ t.close();
+ }
+
+ private void verifyInMemSortedStream(
+ InMemorySortedOutput output, int partition, int chunkSize)
+ throws Exception {
+ ChunkedStream cs =
+ new ChunkedStream(
+ output.getSorter().getSortedStream(partition), chunkSize);
+ int actualBytes = 0;
+ ChannelBuffer b = null;
+ while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
+ LOG.info("b = " + b);
+ actualBytes +=
+ (b instanceof TruncatedChannelBuffer) ?
+ ((TruncatedChannelBuffer)b).capacity() :
+ ((BigEndianHeapChannelBuffer)b).readableBytes();
+ }
+
+ LOG.info("verifyInMemSortedStream" +
+ " partition=" + partition +
+ " chunkSize=" + chunkSize +
+ " expected=" +
+ output.getSorter().getShuffleHeader(partition).getCompressedLength() +
+ " actual=" + actualBytes);
+ Assert.assertEquals(
+ output.getSorter().getShuffleHeader(partition).getCompressedLength(),
+ actualBytes);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.reduce;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+import org.apache.tez.mapreduce.task.LocalFinalTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestReduceProcessor {
+
+ private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
+
+ JobConf job;
+
+ private static JobConf defaultConf = new JobConf();
+ private static FileSystem localFs = null;
+ static {
+ try {
+ defaultConf.set("fs.defaultFS", "file:///");
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestReduceProcessor").makeQualified(localFs);
+
+ @Before
+ public void setUp() {
+ job = new JobConf(defaultConf);
+ job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+ job.setClass(
+ Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezLocalTaskOutputFiles.class,
+ TezTaskOutput.class);
+ job.setNumReduceTasks(1);
+ }
+
+ @Test
+ public void testReduceProcessor() throws Exception {
+ localFs.delete(workDir, true);
+
+ // Run a map
+ MapUtils.runMapProcessor(
+ localFs, workDir, job, 0, new Path(workDir, "map0"),
+ new InitialTaskWithLocalSort(), new TestUmbilicalProtocol()
+ );
+
+ LOG.info("Starting reduce...");
+ FileOutputFormat.setOutputPath(job, new Path(workDir, "output"));
+
+ // Now run a reduce
+ TezEngineTask taskContext = new TezEngineTask(
+ TezTestUtils.getMockTaskAttemptId(0, 0, 0, MRTaskType.REDUCE), "tez",
+ "tez", "TODO_vertexName", LocalFinalTask.class.getName(),
+ Collections.singletonList(new InputSpec("TODO_srcVertexName", 1)), null);
+ job.set(JobContext.TASK_ATTEMPT_ID, taskContext.getTaskAttemptId().toString());
+ Injector injector = Guice.createInjector(new LocalFinalTask());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(job, new TestUmbilicalProtocol());
+ t.run();
+ t.close();
+
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties Thu Apr 18 23:54:18 2013
@@ -0,0 +1,19 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-yarn-client</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class ClientCache {
+
+ private final Configuration conf;
+ private final ResourceMgrDelegate rm;
+
+ private static final Log LOG = LogFactory.getLog(ClientCache.class);
+
+ private Map<JobID, ClientServiceDelegate> cache =
+ new HashMap<JobID, ClientServiceDelegate>();
+
+ private MRClientProtocol hsProxy;
+
+ public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+ this.conf = conf;
+ this.rm = rm;
+ }
+
+ //TODO: evict from the cache on some threshold
+ public synchronized ClientServiceDelegate getClient(JobID jobId) {
+ if (hsProxy == null) {
+ try {
+ hsProxy = instantiateHistoryProxy();
+ } catch (IOException e) {
+ LOG.warn("Could not connect to History server.", e);
+ throw new YarnException("Could not connect to History server.", e);
+ }
+ }
+ ClientServiceDelegate client = cache.get(jobId);
+ if (client == null) {
+ client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+ cache.put(jobId, client);
+ }
+ return client;
+ }
+
+ protected synchronized MRClientProtocol getInitializedHSProxy()
+ throws IOException {
+ if (this.hsProxy == null) {
+ hsProxy = instantiateHistoryProxy();
+ }
+ return this.hsProxy;
+ }
+
+ protected MRClientProtocol instantiateHistoryProxy()
+ throws IOException {
+ final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+ if (StringUtils.isEmpty(serviceAddr)) {
+ return null;
+ }
+ LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
+ final YarnRPC rpc = YarnRPC.create(conf);
+ LOG.debug("Connected to HistoryServer at: " + serviceAddr);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() {
+ return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), conf);
+ }
+ });
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.*;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ClientToken;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+
+public class ClientServiceDelegate {
+ private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+ private static final String UNAVAILABLE = "N/A";
+
+ // Caches for per-user NotRunningJobs
+ private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
+
+ private final Configuration conf;
+ private final JobID jobId;
+ private final ApplicationId appId;
+ private final ResourceMgrDelegate rm;
+ private final MRClientProtocol historyServerProxy;
+ private MRClientProtocol realProxy = null;
+ private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private static String UNKNOWN_USER = "Unknown User";
+ private String trackingUrl;
+
+ private boolean amAclDisabledStatusLogged = false;
+
+ public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ JobID jobId, MRClientProtocol historyServerProxy) {
+ this.conf = new Configuration(conf); // Cloning for modifying.
+ // For faster redirects from AM to HS.
+ this.conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
+ this.rm = rm;
+ this.jobId = jobId;
+ this.historyServerProxy = historyServerProxy;
+ this.appId = TypeConverter.toYarn(jobId).getAppId();
+ notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
+ }
+
+ // Get the instance of the NotRunningJob corresponding to the specified
+ // user and state
+ private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
+ JobState state) {
+ synchronized (notRunningJobs) {
+ HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
+ if (map == null) {
+ map = new HashMap<String, NotRunningJob>();
+ notRunningJobs.put(state, map);
+ }
+ String user =
+ (applicationReport == null) ?
+ UNKNOWN_USER : applicationReport.getUser();
+ NotRunningJob notRunningJob = map.get(user);
+ if (notRunningJob == null) {
+ notRunningJob = new NotRunningJob(applicationReport, state);
+ map.put(user, notRunningJob);
+ }
+ return notRunningJob;
+ }
+ }
+
+ private MRClientProtocol getProxy() throws YarnRemoteException {
+ if (realProxy != null) {
+ return realProxy;
+ }
+
+ // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
+ // and redirect to the history server.
+ ApplicationReport application = rm.getApplicationReport(appId);
+ if (application != null) {
+ trackingUrl = application.getTrackingUrl();
+ }
+ InetSocketAddress serviceAddr = null;
+ while (application == null
+ || YarnApplicationState.RUNNING == application
+ .getYarnApplicationState()) {
+ if (application == null) {
+ LOG.info("Could not get Job info from RM for job " + jobId
+ + ". Redirecting to job history server.");
+ return checkAndGetHSProxy(null, JobState.NEW);
+ }
+ try {
+ if (application.getHost() == null || "".equals(application.getHost())) {
+ LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
+ Thread.sleep(2000);
+
+ LOG.debug("Application state is " + application.getYarnApplicationState());
+ application = rm.getApplicationReport(appId);
+ continue;
+ } else if (UNAVAILABLE.equals(application.getHost())) {
+ if (!amAclDisabledStatusLogged) {
+ LOG.info("Job " + jobId + " is running, but the host is unknown."
+ + " Verify user has VIEW_JOB access.");
+ amAclDisabledStatusLogged = true;
+ }
+ return getNotRunningJob(application, JobState.RUNNING);
+ }
+ if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
+ UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+ UserGroupInformation.getCurrentUser().getUserName());
+ serviceAddr = NetUtils.createSocketAddrForHost(
+ application.getHost(), application.getRpcPort());
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ClientToken clientToken = application.getClientToken();
+ Token<ClientTokenIdentifier> token =
+ ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
+ newUgi.addToken(token);
+ }
+ LOG.debug("Connecting to " + serviceAddr);
+ final InetSocketAddress finalServiceAddr = serviceAddr;
+ realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() throws IOException {
+ return instantiateAMProxy(finalServiceAddr);
+ }
+ });
+ } else {
+ if (!amAclDisabledStatusLogged) {
+ LOG.info("Network ACL closed to AM for job " + jobId
+ + ". Not going to try to reach the AM.");
+ amAclDisabledStatusLogged = true;
+ }
+ return getNotRunningJob(null, JobState.RUNNING);
+ }
+ return realProxy;
+ } catch (IOException e) {
+ //possibly the AM has crashed
+ //there may be some time before AM is restarted
+ //keep retrying by getting the address from RM
+ LOG.info("Could not connect to " + serviceAddr +
+ ". Waiting for getting the latest AM address...");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e1) {
+ LOG.warn("getProxy() call interruped", e1);
+ throw new YarnException(e1);
+ }
+ application = rm.getApplicationReport(appId);
+ if (application == null) {
+ LOG.info("Could not get Job info from RM for job " + jobId
+ + ". Redirecting to job history server.");
+ return checkAndGetHSProxy(null, JobState.RUNNING);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("getProxy() call interruped", e);
+ throw new YarnException(e);
+ }
+ }
+
+ /** we just want to return if its allocating, so that we don't
+ * block on it. This is to be able to return job status
+ * on an allocating Application.
+ */
+ String user = application.getUser();
+ if (user == null) {
+ throw RPCUtil.getRemoteException("User is not set in the application report");
+ }
+ if (application.getYarnApplicationState() == YarnApplicationState.NEW
+ || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
+ || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ realProxy = null;
+ return getNotRunningJob(application, JobState.NEW);
+ }
+
+ if (application.getYarnApplicationState() == YarnApplicationState.FAILED) {
+ realProxy = null;
+ return getNotRunningJob(application, JobState.FAILED);
+ }
+
+ if (application.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ realProxy = null;
+ return getNotRunningJob(application, JobState.KILLED);
+ }
+
+ //History server can serve a job only if application
+ //succeeded.
+ if (application.getYarnApplicationState() == YarnApplicationState.FINISHED) {
+ LOG.info("Application state is completed. FinalApplicationStatus="
+ + application.getFinalApplicationStatus().toString()
+ + ". Redirecting to job history server");
+ realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
+ }
+ return realProxy;
+ }
+
+ private MRClientProtocol checkAndGetHSProxy(
+ ApplicationReport applicationReport, JobState state) {
+ if (null == historyServerProxy) {
+ LOG.warn("Job History Server is not configured.");
+ return getNotRunningJob(applicationReport, state);
+ }
+ return historyServerProxy;
+ }
+
+ MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
+ throws IOException {
+ LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
+ YarnRPC rpc = YarnRPC.create(conf);
+ MRClientProtocol proxy =
+ (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+ serviceAddr, conf);
+ LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+ return proxy;
+ }
+
+ private synchronized Object invoke(String method, Class argClass,
+ Object args) throws IOException {
+ Method methodOb = null;
+ try {
+ methodOb = MRClientProtocol.class.getMethod(method, argClass);
+ } catch (SecurityException e) {
+ throw new YarnException(e);
+ } catch (NoSuchMethodException e) {
+ throw new YarnException("Method name mismatch", e);
+ }
+ int maxRetries = this.conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ IOException lastException = null;
+ while (maxRetries > 0) {
+ try {
+ return methodOb.invoke(getProxy(), args);
+ } catch (YarnRemoteException yre) {
+ LOG.warn("Exception thrown by remote end.", yre);
+ throw yre;
+ } catch (InvocationTargetException e) {
+ if (e.getTargetException() instanceof YarnRemoteException) {
+ LOG.warn("Error from remote end: " + e
+ .getTargetException().getLocalizedMessage());
+ LOG.debug("Tracing remote error ", e.getTargetException());
+ throw (YarnRemoteException) e.getTargetException();
+ }
+ LOG.debug("Failed to contact AM/History for job " + jobId +
+ " retrying..", e.getTargetException());
+ // Force reconnection by setting the proxy to null.
+ realProxy = null;
+ // HS/AMS shut down
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+
+ } catch (Exception e) {
+ LOG.debug("Failed to contact AM/History for job " + jobId
+ + " Will retry..", e);
+ // Force reconnection by setting the proxy to null.
+ realProxy = null;
+ // RM shutdown
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+ }
+ }
+ throw lastException;
+ }
+
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+ GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+ request.setJobId(jobID);
+ Counters cnt = ((GetCountersResponse)
+ invoke("getCounters", GetCountersRequest.class, request)).getCounters();
+ return TypeConverter.fromYarn(cnt);
+
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+ throws IOException, InterruptedException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
+ .toYarn(arg0);
+ GetTaskAttemptCompletionEventsRequest request = recordFactory
+ .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+ request.setJobId(jobID);
+ request.setFromEventId(arg1);
+ request.setMaxEvents(arg2);
+ List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
+ ((GetTaskAttemptCompletionEventsResponse) invoke(
+ "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
+ getCompletionEventList();
+ return TypeConverter
+ .fromYarn(list
+ .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+ }
+
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+ throws IOException, InterruptedException {
+
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
+ .toYarn(arg0);
+ GetDiagnosticsRequest request = recordFactory
+ .newRecordInstance(GetDiagnosticsRequest.class);
+ request.setTaskAttemptId(attemptID);
+ List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
+ GetDiagnosticsRequest.class, request)).getDiagnosticsList();
+ String[] result = new String[list.size()];
+ int i = 0;
+ for (String c : list) {
+ result[i++] = c.toString();
+ }
+ return result;
+ }
+
+ public JobStatus getJobStatus(JobID oldJobID) throws IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+ TypeConverter.toYarn(oldJobID);
+ GetJobReportRequest request =
+ recordFactory.newRecordInstance(GetJobReportRequest.class);
+ request.setJobId(jobId);
+ JobReport report = ((GetJobReportResponse) invoke("getJobReport",
+ GetJobReportRequest.class, request)).getJobReport();
+ JobStatus jobStatus = null;
+ if (report != null) {
+ if (StringUtils.isEmpty(report.getJobFile())) {
+ String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
+ report.setJobFile(jobFile);
+ }
+ String historyTrackingUrl = report.getTrackingUrl();
+ String url = StringUtils.isNotEmpty(historyTrackingUrl)
+ ? historyTrackingUrl : trackingUrl;
+ if (!UNAVAILABLE.equals(url)) {
+ url = HttpConfig.getSchemePrefix() + url;
+ }
+ jobStatus = TypeConverter.fromYarn(report, url);
+ }
+ return jobStatus;
+ }
+
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+ throws IOException{
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+ TypeConverter.toYarn(oldJobID);
+ GetTaskReportsRequest request =
+ recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+ request.setJobId(jobId);
+ request.setTaskType(TypeConverter.toYarn(taskType));
+
+ List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
+ ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class,
+ request)).getTaskReportList();
+
+ return TypeConverter.fromYarn
+ (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
+ }
+
+ public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+ throws IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
+ = TypeConverter.toYarn(taskAttemptID);
+ if (fail) {
+ FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+ failRequest.setTaskAttemptId(attemptID);
+ invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
+ } else {
+ KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+ killRequest.setTaskAttemptId(attemptID);
+ invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
+ }
+ return true;
+ }
+
+ public boolean killJob(JobID oldJobID)
+ throws IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
+ = TypeConverter.toYarn(oldJobID);
+ KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
+ killRequest.setJobId(jobId);
+ invoke("killJob", KillJobRequest.class, killRequest);
+ return true;
+ }
+
+ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+ throws YarnRemoteException, IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+ TypeConverter.toYarn(oldJobID);
+ GetJobReportRequest request =
+ recordFactory.newRecordInstance(GetJobReportRequest.class);
+ request.setJobId(jobId);
+
+ JobReport report =
+ ((GetJobReportResponse) invoke("getJobReport",
+ GetJobReportRequest.class, request)).getJobReport();
+ if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
+ JobState.ERROR).contains(report.getJobState())) {
+ if (oldTaskAttemptID != null) {
+ GetTaskAttemptReportRequest taRequest =
+ recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+ taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
+ TaskAttemptReport taReport =
+ ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+ GetTaskAttemptReportRequest.class, taRequest))
+ .getTaskAttemptReport();
+ if (taReport.getContainerId() == null
+ || taReport.getNodeManagerHost() == null) {
+ throw new IOException("Unable to get log information for task: "
+ + oldTaskAttemptID);
+ }
+ return new LogParams(
+ taReport.getContainerId().toString(),
+ taReport.getContainerId().getApplicationAttemptId()
+ .getApplicationId().toString(),
+ BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
+ taReport.getNodeManagerPort()).toString(), report.getUser());
+ } else {
+ if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
+ throw new IOException("Unable to get log information for job: "
+ + oldJobID);
+ }
+ AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
+ return new LogParams(
+ amInfo.getContainerId().toString(),
+ amInfo.getAppAttemptId().getApplicationId().toString(),
+ BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
+ amInfo.getNodeManagerPort()).toString(), report.getUser());
+ }
+ } else {
+ throw new IOException("Cannot get log path for a in-progress job");
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+public class NotRunningJob implements MRClientProtocol {
+
+ private RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private final JobState jobState;
+ private final ApplicationReport applicationReport;
+
+
+ private ApplicationReport getUnknownApplicationReport() {
+ ApplicationId unknownAppId = recordFactory
+ .newRecordInstance(ApplicationId.class);
+ ApplicationAttemptId unknownAttemptId = recordFactory
+ .newRecordInstance(ApplicationAttemptId.class);
+
+ // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+ // used for a non running job
+ return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
+ "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
+ "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
+ }
+
+ NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+ this.applicationReport =
+ (applicationReport == null) ?
+ getUnknownApplicationReport() : applicationReport;
+ this.jobState = jobState;
+ }
+
+ @Override
+ public FailTaskAttemptResponse failTaskAttempt(
+ FailTaskAttemptRequest request) throws YarnRemoteException {
+ FailTaskAttemptResponse resp =
+ recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+ return resp;
+ }
+
+ @Override
+ public GetCountersResponse getCounters(GetCountersRequest request)
+ throws YarnRemoteException {
+ GetCountersResponse resp =
+ recordFactory.newRecordInstance(GetCountersResponse.class);
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+ counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+ resp.setCounters(counters);
+ return resp;
+ }
+
+ @Override
+ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+ throws YarnRemoteException {
+ GetDiagnosticsResponse resp =
+ recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+ resp.addDiagnostics("");
+ return resp;
+ }
+
+ @Override
+ public GetJobReportResponse getJobReport(GetJobReportRequest request)
+ throws YarnRemoteException {
+ JobReport jobReport =
+ recordFactory.newRecordInstance(JobReport.class);
+ jobReport.setJobId(request.getJobId());
+ jobReport.setJobState(jobState);
+ jobReport.setUser(applicationReport.getUser());
+ jobReport.setStartTime(applicationReport.getStartTime());
+ jobReport.setDiagnostics(applicationReport.getDiagnostics());
+ jobReport.setJobName(applicationReport.getName());
+ jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+ jobReport.setFinishTime(applicationReport.getFinishTime());
+
+ GetJobReportResponse resp =
+ recordFactory.newRecordInstance(GetJobReportResponse.class);
+ resp.setJobReport(jobReport);
+ return resp;
+ }
+
+ @Override
+ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+ GetTaskAttemptCompletionEventsRequest request)
+ throws YarnRemoteException {
+ GetTaskAttemptCompletionEventsResponse resp =
+ recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+ resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+ return resp;
+ }
+
+ @Override
+ public GetTaskAttemptReportResponse getTaskAttemptReport(
+ GetTaskAttemptReportRequest request) throws YarnRemoteException {
+ //not invoked by anybody
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+ throws YarnRemoteException {
+ GetTaskReportResponse resp =
+ recordFactory.newRecordInstance(GetTaskReportResponse.class);
+ TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+ report.setTaskId(request.getTaskId());
+ report.setTaskState(TaskState.NEW);
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+ counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+ report.setCounters(counters);
+ report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+ return resp;
+ }
+
+ @Override
+ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+ throws YarnRemoteException {
+ GetTaskReportsResponse resp =
+ recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+ resp.addAllTaskReports(new ArrayList<TaskReport>());
+ return resp;
+ }
+
+ @Override
+ public KillJobResponse killJob(KillJobRequest request)
+ throws YarnRemoteException {
+ KillJobResponse resp =
+ recordFactory.newRecordInstance(KillJobResponse.class);
+ return resp;
+ }
+
+ @Override
+ public KillTaskResponse killTask(KillTaskRequest request)
+ throws YarnRemoteException {
+ KillTaskResponse resp =
+ recordFactory.newRecordInstance(KillTaskResponse.class);
+ return resp;
+ }
+
+ @Override
+ public KillTaskAttemptResponse killTaskAttempt(
+ KillTaskAttemptRequest request) throws YarnRemoteException {
+ KillTaskAttemptResponse resp =
+ recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+ return resp;
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnRemoteException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnRemoteException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnRemoteException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InetSocketAddress getConnectAddress() {
+ /* Should not be invoked by anyone. Normally used to set token service */
+ throw new NotImplementedException();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,168 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ResourceMgrDelegate extends YarnClientImpl {
+ private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+
+ private YarnConfiguration conf;
+ private GetNewApplicationResponse application;
+ private ApplicationId applicationId;
+
+ /**
+ * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+ * @param conf the configuration object.
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf) {
+ super();
+ this.conf = conf;
+ init(conf);
+ start();
+ }
+
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarnNodes(super.getNodeReports());
+ }
+
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
+ }
+
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ // TODO: Implement getBlacklistedTrackers
+ LOG.warn("getBlacklistedTrackers - Not implemented yet");
+ return new TaskTrackerInfo[0];
+ }
+
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ YarnClusterMetrics metrics = super.getYarnClusterMetrics();
+ ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
+ metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+ metrics.getNumNodeManagers(), 0, 0);
+ return oldMetrics;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Token getDelegationToken(Text renewer) throws IOException,
+ InterruptedException {
+ return ProtoUtils.convertFromProtoFormat(
+ super.getRMDelegationToken(renewer), rmAddress);
+ }
+
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return FileSystem.get(conf).getUri().toString();
+ }
+
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ this.application = super.getNewApplication();
+ this.applicationId = this.application.getApplicationId();
+ return TypeConverter.fromYarn(applicationId);
+ }
+
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarn(
+ super.getQueueInfo(queueName), this.conf);
+ }
+
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarnQueueUserAclsInfo(super
+ .getQueueAclsInfo());
+ }
+
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
+ }
+
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
+ }
+
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+ this.conf);
+ }
+
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+ String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path path = MRApps.getStagingAreaDir(conf, user);
+ LOG.debug("getStagingAreaDir: dir=" + path);
+ return path.toString();
+ }
+
+
+ public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+ //FileContext.getFileContext(conf).delete(sysDir, true);
+ return sysDir.toString();
+ }
+
+
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return 0;
+ }
+
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ return;
+ }
+
+
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native