You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/26 02:14:04 UTC
[1/3] TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move
YARNRunner to tez-mapreduce project (bikas)
Updated Branches:
refs/heads/master 1c44f634c -> 360d30a9f
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/pom.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/pom.xml b/tez-yarn-client/pom.xml
deleted file mode 100644
index 5b9278b..0000000
--- a/tez-yarn-client/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez</artifactId>
- <version>0.2.0-SNAPSHOT</version>
- </parent>
- <artifactId>tez-yarn-client</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
deleted file mode 100644
index 142fa5d..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-
-public class ClientCache {
-
- private final Configuration conf;
- private final ResourceMgrDelegate rm;
-
- private Map<JobID, ClientServiceDelegate> cache =
- new HashMap<JobID, ClientServiceDelegate>();
-
- public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
- this.conf = conf;
- this.rm = rm;
- }
-
- //TODO: evict from the cache on some threshold
- public synchronized ClientServiceDelegate getClient(JobID jobId) {
- ClientServiceDelegate client = cache.get(jobId);
- if (client == null) {
- client = new ClientServiceDelegate(conf, rm, jobId);
- cache.put(jobId, client);
- }
- return client;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
deleted file mode 100644
index 44b2734..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.dag.api.TezConfiguration;
-
-import java.io.IOException;
-
-public class ClientServiceDelegate {
-
- private final TezConfiguration conf;
-
- // FIXME
- // how to handle completed jobs that the RM does not know about?
-
- public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
- JobID jobId) {
- this.conf = new TezConfiguration(conf); // Cloning for modifying.
- // For faster redirects from AM to HS.
- this.conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
- this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
- MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
- }
-
- public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
- throws IOException, InterruptedException {
- // FIXME needs counters support from DAG
- // with a translation layer on client side
- org.apache.hadoop.mapreduce.Counters empty =
- new org.apache.hadoop.mapreduce.Counters();
- return empty;
- }
-
- public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
- int fromEventId, int maxEvents)
- throws IOException, InterruptedException {
- // FIXME seems like there is support in client to query task failure
- // related information
- // However, api does not make sense for DAG
- return new TaskCompletionEvent[0];
- }
-
- public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
- taId)
- throws IOException, InterruptedException {
- // FIXME need support to query task diagnostics?
- return new String[0];
- }
-
- public JobStatus getJobStatus(JobID oldJobID) throws IOException {
- // handled in YARNRunner
- throw new UnsupportedOperationException();
- }
-
- public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
- JobID oldJobID, TaskType taskType)
- throws IOException{
- // TEZ-146: need to return real task reports
- return new org.apache.hadoop.mapreduce.TaskReport[0];
- }
-
- public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
- throws IOException {
- // FIXME need support to kill a task attempt?
- throw new UnsupportedOperationException();
- }
-
- public boolean killJob(JobID oldJobID)
- throws IOException {
- // FIXME need support to kill a dag?
- // Should this be just an RM killApplication?
- // For one dag per AM, RM kill should suffice
- throw new UnsupportedOperationException();
- }
-
- public LogParams getLogFilePath(JobID oldJobID,
- TaskAttemptID oldTaskAttemptID)
- throws YarnException, IOException {
- // FIXME logs for an attempt?
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
deleted file mode 100644
index 0b768c0..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-
-import com.google.common.base.Joiner;
-
-public class DAGJobStatus extends JobStatus {
-
- private final String jobFile;
- private final DAGStatus dagStatus;
- private final ApplicationReport report;
-
- public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
- super();
- this.dagStatus = dagStatus;
- this.jobFile = jobFile;
- this.report = report;
- }
-
- @Override
- protected synchronized void setMapProgress(float p) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setCleanupProgress(float p) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setSetupProgress(float p) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setReduceProgress(float p) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setPriority(JobPriority jp) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setFinishTime(long finishTime) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setHistoryFile(String historyFile) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setTrackingUrl(String trackingUrl) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setRetired() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setState(State state) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setStartTime(long startTime) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setUsername(String userName) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setSchedulingInfo(String schedulingInfo) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setQueue(String queue) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected synchronized void setFailureInfo(String failureInfo) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public synchronized String getQueue() {
- return report.getQueue();
- }
-
- @Override
- public synchronized float getMapProgress() {
- if(dagStatus.getVertexProgress() != null) {
- return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName());
- }
- if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
- return 1.0f;
- }
- return 0.0f;
- }
-
- @Override
- public synchronized float getCleanupProgress() {
- if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
- dagStatus.getState() == DAGStatus.State.KILLED ||
- dagStatus.getState() == DAGStatus.State.ERROR) {
- return 1.0f;
- }
- return 0.0f;
- }
-
- @Override
- public synchronized float getSetupProgress() {
- if (dagStatus.getState() == DAGStatus.State.RUNNING) {
- return 1.0f;
- }
- return 0.0f;
- }
-
- @Override
- public synchronized float getReduceProgress() {
- if(dagStatus.getVertexProgress() != null) {
- return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
- }
- if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
- return 1.0f;
- }
- return 0.0f;
- }
-
- @Override
- public synchronized State getState() {
- switch (dagStatus.getState()) {
- case SUBMITTED:
- case INITING:
- return State.PREP;
- case RUNNING:
- return State.RUNNING;
- case SUCCEEDED:
- return State.SUCCEEDED;
- case KILLED:
- return State.KILLED;
- case FAILED:
- case ERROR:
- return State.FAILED;
- default:
- throw new TezUncheckedException("Unknown value of DAGState.State:"
- + dagStatus.getState());
- }
- }
-
- @Override
- public synchronized long getStartTime() {
- return report.getStartTime();
- }
-
- @Override
- public JobID getJobID() {
- return TypeConverter.fromYarn(report.getApplicationId());
- }
-
- @Override
- public synchronized String getUsername() {
- return report.getUser();
- }
-
- @Override
- public synchronized String getSchedulingInfo() {
- return report.getTrackingUrl();
- }
-
- @Override
- public synchronized Map<JobACL, AccessControlList> getJobACLs() {
- // TODO Auto-generated method stub
- return super.getJobACLs();
- }
-
- @Override
- public synchronized JobPriority getPriority() {
- // TEX-147: return real priority
- return JobPriority.NORMAL;
- }
-
- @Override
- public synchronized String getFailureInfo() {
- return Joiner.on(". ").join(dagStatus.getDiagnostics());
- }
-
- @Override
- public synchronized boolean isJobComplete() {
- return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
- dagStatus.getState() == DAGStatus.State.KILLED ||
- dagStatus.getState() == DAGStatus.State.ERROR);
- }
-
- @Override
- public synchronized void write(DataOutput out) throws IOException {
- // FIXME
- }
-
- @Override
- public synchronized void readFields(DataInput in) throws IOException {
- // FIXME
- }
-
- @Override
- public String getJobName() {
- return report.getName();
- }
-
- @Override
- public String getJobFile() {
- return jobFile;
- }
-
- @Override
- public synchronized String getTrackingUrl() {
- return report.getTrackingUrl();
- }
-
- @Override
- public synchronized long getFinishTime() {
- return report.getFinishTime();
- }
-
- @Override
- public synchronized boolean isRetired() {
- // FIXME handle retired jobs?
- return false;
- }
-
- @Override
- public synchronized String getHistoryFile() {
- // FIXME handle history in status
- return null;
- }
-
- @Override
- public int getNumUsedSlots() {
- return report.getApplicationResourceUsageReport().getNumUsedContainers();
- }
-
- @Override
- public void setNumUsedSlots(int n) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getNumReservedSlots() {
- return report.getApplicationResourceUsageReport().
- getNumReservedContainers();
- }
-
- @Override
- public void setNumReservedSlots(int n) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getUsedMem() {
- return report.getApplicationResourceUsageReport().
- getUsedResources().getMemory();
- }
-
- @Override
- public void setUsedMem(int m) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getReservedMem() {
- return report.getApplicationResourceUsageReport().
- getReservedResources().getMemory();
- }
-
- @Override
- public void setReservedMem(int r) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getNeededMem() {
- return report.getApplicationResourceUsageReport().
- getNeededResources().getMemory();
- }
-
- @Override
- public void setNeededMem(int n) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public synchronized boolean isUber() {
- return false;
- }
-
- @Override
- public synchronized void setUber(boolean isUber) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toString() {
- StringBuffer buffer = new StringBuffer();
- buffer.append("job-id : " + getJobID());
- buffer.append("uber-mode : " + isUber());
- buffer.append("map-progress : " + getMapProgress());
- buffer.append("reduce-progress : " + getReduceProgress());
- buffer.append("cleanup-progress : " + getCleanupProgress());
- buffer.append("setup-progress : " + getSetupProgress());
- buffer.append("runstate : " + getState());
- buffer.append("start-time : " + getStartTime());
- buffer.append("user-name : " + getUsername());
- buffer.append("priority : " + getPriority());
- buffer.append("scheduling-info : " + getSchedulingInfo());
- buffer.append("num-used-slots" + getNumUsedSlots());
- buffer.append("num-reserved-slots" + getNumReservedSlots());
- buffer.append("used-mem" + getUsedMem());
- buffer.append("reserved-mem" + getReservedMem());
- buffer.append("needed-mem" + getNeededMem());
- return buffer.toString();
- }
-
- private float getProgress(String vertexName) {
- Progress progress = dagStatus.getVertexProgress().get(vertexName);
- if(progress == null) {
- // no such stage. return 0 like MR app currently does.
- return 0;
- }
- float totalTasks = (float) progress.getTotalTaskCount();
- if(totalTasks != 0) {
- return progress.getSucceededTaskCount()/totalTasks;
- }
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
deleted file mode 100644
index e649990..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
-import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-
-public class NotRunningJob implements MRClientProtocol {
-
- private RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- private final JobState jobState;
- private final ApplicationReport applicationReport;
-
-
- private ApplicationReport getUnknownApplicationReport() {
- ApplicationId unknownAppId = recordFactory
- .newRecordInstance(ApplicationId.class);
- ApplicationAttemptId unknownAttemptId = recordFactory
- .newRecordInstance(ApplicationAttemptId.class);
-
- // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
- // used for a non running job
- return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A",
- "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A",
- 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, "TEZ_MRR", null);
- }
-
- NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
- this.applicationReport =
- (applicationReport == null) ?
- getUnknownApplicationReport() : applicationReport;
- this.jobState = jobState;
- }
-
- @Override
- public FailTaskAttemptResponse failTaskAttempt(
- FailTaskAttemptRequest request) throws IOException {
- FailTaskAttemptResponse resp =
- recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
- return resp;
- }
-
- @Override
- public GetCountersResponse getCounters(GetCountersRequest request)
- throws IOException {
- GetCountersResponse resp =
- recordFactory.newRecordInstance(GetCountersResponse.class);
- Counters counters = recordFactory.newRecordInstance(Counters.class);
- counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
- resp.setCounters(counters);
- return resp;
- }
-
- @Override
- public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
- throws IOException {
- GetDiagnosticsResponse resp =
- recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
- resp.addDiagnostics("");
- return resp;
- }
-
- @Override
- public GetJobReportResponse getJobReport(GetJobReportRequest request)
- throws IOException {
- JobReport jobReport =
- recordFactory.newRecordInstance(JobReport.class);
- jobReport.setJobId(request.getJobId());
- jobReport.setJobState(jobState);
- jobReport.setUser(applicationReport.getUser());
- jobReport.setStartTime(applicationReport.getStartTime());
- jobReport.setDiagnostics(applicationReport.getDiagnostics());
- jobReport.setJobName(applicationReport.getName());
- jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
- jobReport.setFinishTime(applicationReport.getFinishTime());
-
- GetJobReportResponse resp =
- recordFactory.newRecordInstance(GetJobReportResponse.class);
- resp.setJobReport(jobReport);
- return resp;
- }
-
- @Override
- public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
- GetTaskAttemptCompletionEventsRequest request)
- throws IOException {
- GetTaskAttemptCompletionEventsResponse resp =
- recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
- resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
- return resp;
- }
-
- @Override
- public GetTaskAttemptReportResponse getTaskAttemptReport(
- GetTaskAttemptReportRequest request) throws IOException {
- //not invoked by anybody
- throw new NotImplementedException();
- }
-
- @Override
- public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
- throws IOException {
- GetTaskReportResponse resp =
- recordFactory.newRecordInstance(GetTaskReportResponse.class);
- TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
- report.setTaskId(request.getTaskId());
- report.setTaskState(TaskState.NEW);
- Counters counters = recordFactory.newRecordInstance(Counters.class);
- counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
- report.setCounters(counters);
- report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
- return resp;
- }
-
- @Override
- public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
- throws IOException {
- GetTaskReportsResponse resp =
- recordFactory.newRecordInstance(GetTaskReportsResponse.class);
- resp.addAllTaskReports(new ArrayList<TaskReport>());
- return resp;
- }
-
- @Override
- public KillJobResponse killJob(KillJobRequest request)
- throws IOException {
- KillJobResponse resp =
- recordFactory.newRecordInstance(KillJobResponse.class);
- return resp;
- }
-
- @Override
- public KillTaskResponse killTask(KillTaskRequest request)
- throws IOException {
- KillTaskResponse resp =
- recordFactory.newRecordInstance(KillTaskResponse.class);
- return resp;
- }
-
- @Override
- public KillTaskAttemptResponse killTaskAttempt(
- KillTaskAttemptRequest request) throws IOException {
- KillTaskAttemptResponse resp =
- recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
- return resp;
- }
-
- @Override
- public GetDelegationTokenResponse getDelegationToken(
- GetDelegationTokenRequest request) throws IOException {
- /* Should not be invoked by anyone. */
- throw new NotImplementedException();
- }
-
- @Override
- public RenewDelegationTokenResponse renewDelegationToken(
- RenewDelegationTokenRequest request) throws IOException {
- /* Should not be invoked by anyone. */
- throw new NotImplementedException();
- }
-
- @Override
- public CancelDelegationTokenResponse cancelDelegationToken(
- CancelDelegationTokenRequest request) throws IOException {
- /* Should not be invoked by anyone. */
- throw new NotImplementedException();
- }
-
- @Override
- public InetSocketAddress getConnectAddress() {
- /* Should not be invoked by anyone. Normally used to set token service */
- throw new NotImplementedException();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
deleted file mode 100644
index 0e767b4..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.QueueAclsInfo;
-import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-public class ResourceMgrDelegate {
- private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
-
- private YarnConfiguration conf;
- private GetNewApplicationResponse application;
- private ApplicationId applicationId;
- private YarnClient client;
- private InetSocketAddress rmAddress;
-
- /**
- * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
- * @param conf the configuration object.
- */
- public ResourceMgrDelegate(YarnConfiguration conf) {
- super();
- this.conf = conf;
- client = YarnClient.createYarnClient();
- client.init(conf);
- this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- client.start();
- }
-
- public TaskTrackerInfo[] getActiveTrackers() throws IOException,
- InterruptedException {
- try {
- return TypeConverter.fromYarnNodes(client.getNodeReports());
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public JobStatus[] getAllJobs() throws IOException, InterruptedException {
- try {
- Set<String> appTypes = new HashSet<String>(1);
- appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
- return TypeConverter.fromYarnApps(client.getApplications(appTypes),
- this.conf);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
- InterruptedException {
- // TODO: Implement getBlacklistedTrackers
- LOG.warn("getBlacklistedTrackers - Not implemented yet");
- return new TaskTrackerInfo[0];
- }
-
- public ClusterMetrics getClusterMetrics() throws IOException,
- InterruptedException {
- YarnClusterMetrics metrics;
- try {
- metrics = client.getYarnClusterMetrics();
- } catch (YarnException e) {
- throw new IOException(e);
- }
- ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
- metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
- metrics.getNumNodeManagers(), 0, 0);
- return oldMetrics;
- }
-
- @SuppressWarnings("rawtypes")
- public Token getDelegationToken(Text renewer) throws IOException,
- InterruptedException {
- try {
- // Remove rmAddress after YARN-868 is addressed
- return ConverterUtils.convertFromYarn(
- client.getRMDelegationToken(renewer), rmAddress);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public String getFilesystemName() throws IOException, InterruptedException {
- return FileSystem.get(conf).getUri().toString();
- }
-
- public JobID getNewJobID() throws IOException, InterruptedException {
- try {
- this.application =
- client.createApplication().getNewApplicationResponse();
- } catch (YarnException e) {
- throw new IOException(e);
- }
- this.applicationId = this.application.getApplicationId();
- return TypeConverter.fromYarn(applicationId);
- }
-
- public QueueInfo getQueue(String queueName) throws IOException,
- InterruptedException {
- try {
- return TypeConverter.fromYarn(
- client.getQueueInfo(queueName), this.conf);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
- InterruptedException {
- try {
- return TypeConverter.fromYarnQueueUserAclsInfo(
- client.getQueueAclsInfo());
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public QueueInfo[] getQueues() throws IOException, InterruptedException {
- try {
- return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
- try {
- return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
- this.conf);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public QueueInfo[] getChildQueues(String parent) throws IOException,
- InterruptedException {
- try {
- return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
- this.conf);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- public String getStagingAreaDir() throws IOException, InterruptedException {
-// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
- String user =
- UserGroupInformation.getCurrentUser().getShortUserName();
- Path path = MRApps.getStagingAreaDir(conf, user);
- LOG.debug("getStagingAreaDir: dir=" + path);
- return path.toString();
- }
-
-
- public String getSystemDir() throws IOException, InterruptedException {
- Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
- //FileContext.getFileContext(conf).delete(sysDir, true);
- return sysDir.toString();
- }
-
-
- public long getTaskTrackerExpiryInterval() throws IOException,
- InterruptedException {
- return 0;
- }
-
- public void setJobPriority(JobID arg0, String arg1) throws IOException,
- InterruptedException {
- return;
- }
-
-
- public long getProtocolVersion(String arg0, long arg1) throws IOException {
- return 0;
- }
-
- public ApplicationId getApplicationId() {
- return applicationId;
- }
-
- public void killApplication(ApplicationId appId)
- throws YarnException, IOException {
- client.killApplication(appId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
deleted file mode 100644
index 96f6936..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ /dev/null
@@ -1,722 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.QueueAclsInfo;
-import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.TaskReport;
-import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
- */
-@SuppressWarnings({ "unchecked" })
-public class YARNRunner implements ClientProtocol {
-
- private static final Log LOG = LogFactory.getLog(YARNRunner.class);
-
- private ResourceMgrDelegate resMgrDelegate;
- private ClientCache clientCache;
- private Configuration conf;
- private final FileContext defaultFileContext;
-
- final public static FsPermission DAG_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644);
- final public static int UTF8_CHUNK_SIZE = 16 * 1024;
-
- private final TezConfiguration tezConf;
- private final TezClient tezClient;
- private DAGClient dagClient;
-
- /**
- * Yarn runner incapsulates the client interface of
- * yarn
- * @param conf the configuration object for the client
- */
- public YARNRunner(Configuration conf) {
- this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
- }
-
- /**
- * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
- * {@link ResourceMgrDelegate}. Enables mocking and testing.
- * @param conf the configuration object for the client
- * @param resMgrDelegate the resourcemanager client handle.
- */
- public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
- this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
- }
-
- /**
- * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
- * but allowing injecting {@link ClientCache}. Enable mocking and testing.
- * @param conf the configuration object
- * @param resMgrDelegate the resource manager delegate
- * @param clientCache the client cache object.
- */
- public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
- ClientCache clientCache) {
- this.conf = conf;
- this.tezConf = new TezConfiguration(conf);
- try {
- this.resMgrDelegate = resMgrDelegate;
- this.tezClient = new TezClient(tezConf);
- this.clientCache = clientCache;
- this.defaultFileContext = FileContext.getFileContext(this.conf);
-
- } catch (UnsupportedFileSystemException ufe) {
- throw new RuntimeException("Error in instantiating YarnClient", ufe);
- }
- }
-
- @VisibleForTesting
- @Private
- /**
- * Used for testing mostly.
- * @param resMgrDelegate the resource manager delegate to set to.
- */
- public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
- this.resMgrDelegate = resMgrDelegate;
- }
-
- @Override
- public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
- throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Use Token.renew instead");
- }
-
- @Override
- public TaskTrackerInfo[] getActiveTrackers() throws IOException,
- InterruptedException {
- return resMgrDelegate.getActiveTrackers();
- }
-
- @Override
- public JobStatus[] getAllJobs() throws IOException, InterruptedException {
- return resMgrDelegate.getAllJobs();
- }
-
- @Override
- public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
- InterruptedException {
- return resMgrDelegate.getBlacklistedTrackers();
- }
-
- @Override
- public ClusterMetrics getClusterMetrics() throws IOException,
- InterruptedException {
- return resMgrDelegate.getClusterMetrics();
- }
-
- @Override
- public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
- throws IOException, InterruptedException {
- // The token is only used for serialization. So the type information
- // mismatch should be fine.
- return resMgrDelegate.getDelegationToken(renewer);
- }
-
- @Override
- public String getFilesystemName() throws IOException, InterruptedException {
- return resMgrDelegate.getFilesystemName();
- }
-
- @Override
- public JobID getNewJobID() throws IOException, InterruptedException {
- return resMgrDelegate.getNewJobID();
- }
-
- @Override
- public QueueInfo getQueue(String queueName) throws IOException,
- InterruptedException {
- return resMgrDelegate.getQueue(queueName);
- }
-
- @Override
- public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
- InterruptedException {
- return resMgrDelegate.getQueueAclsForCurrentUser();
- }
-
- @Override
- public QueueInfo[] getQueues() throws IOException, InterruptedException {
- return resMgrDelegate.getQueues();
- }
-
- @Override
- public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
- return resMgrDelegate.getRootQueues();
- }
-
- @Override
- public QueueInfo[] getChildQueues(String parent) throws IOException,
- InterruptedException {
- return resMgrDelegate.getChildQueues(parent);
- }
-
- @Override
- public String getStagingAreaDir() throws IOException, InterruptedException {
- return resMgrDelegate.getStagingAreaDir();
- }
-
- @Override
- public String getSystemDir() throws IOException, InterruptedException {
- return resMgrDelegate.getSystemDir();
- }
-
- @Override
- public long getTaskTrackerExpiryInterval() throws IOException,
- InterruptedException {
- return resMgrDelegate.getTaskTrackerExpiryInterval();
- }
-
- private Map<String, LocalResource> createJobLocalResources(
- Configuration jobConf, String jobSubmitDir)
- throws IOException {
-
- // Setup LocalResources
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
-
- Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
-
- URL yarnUrlForJobSubmitDir = ConverterUtils
- .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
- .resolvePath(
- defaultFileContext.makeQualified(new Path(jobSubmitDir))));
- LOG.debug("Creating setup context, jobSubmitDir url is "
- + yarnUrlForJobSubmitDir);
-
- localResources.put(MRJobConfig.JOB_CONF_FILE,
- createApplicationResource(defaultFileContext,
- jobConfPath, LocalResourceType.FILE));
- if (jobConf.get(MRJobConfig.JAR) != null) {
- Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
- LocalResource rc = createApplicationResource(defaultFileContext,
- jobJarPath,
- LocalResourceType.FILE);
- // FIXME fix pattern support
- // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
- // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
- // rc.setPattern(pattern);
- localResources.put(MRJobConfig.JOB_JAR, rc);
- } else {
- // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
- // mapreduce jar itself which is already on the classpath.
- LOG.info("Job jar is not present. "
- + "Not adding any jar to the list of resources.");
- }
-
- // TODO gross hack
- for (String s : new String[] {
- MRJobConfig.JOB_SPLIT,
- MRJobConfig.JOB_SPLIT_METAINFO}) {
- localResources.put(s,
- createApplicationResource(defaultFileContext,
- new Path(jobSubmitDir, s), LocalResourceType.FILE));
- }
-
- MRApps.setupDistributedCache(jobConf, localResources);
-
- return localResources;
- }
-
- // FIXME isn't this a nice mess of a client?
- // read input, write splits, read splits again
- private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
- FileSystem fs, Configuration conf,
- String jobSubmitDir) throws IOException {
- TaskSplitMetaInfo[] splitsInfo =
- SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
- new Path(jobSubmitDir));
- int splitsCount = splitsInfo.length;
- List<TaskLocationHint> locationHints =
- new ArrayList<TaskLocationHint>(splitsCount);
- for (int i = 0; i < splitsCount; ++i) {
- TaskLocationHint locationHint =
- new TaskLocationHint(
- new HashSet<String>(
- Arrays.asList(splitsInfo[i].getLocations())), null);
- locationHints.add(locationHint);
- }
- return locationHints;
- }
-
- private void setupMapReduceEnv(Configuration jobConf,
- Map<String, String> environment, boolean isMap) throws IOException {
-
- if (isMap) {
- warnForJavaLibPath(
- jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
- "map",
- MRJobConfig.MAP_JAVA_OPTS,
- MRJobConfig.MAP_ENV);
- warnForJavaLibPath(
- jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
- "map",
- MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
- MRJobConfig.MAPRED_ADMIN_USER_ENV);
- } else {
- warnForJavaLibPath(
- jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
- "reduce",
- MRJobConfig.REDUCE_JAVA_OPTS,
- MRJobConfig.REDUCE_ENV);
- warnForJavaLibPath(
- jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
- "reduce",
- MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
- MRJobConfig.MAPRED_ADMIN_USER_ENV);
- }
-
- MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
- }
-
- private Vertex createVertexForStage(Configuration stageConf,
- Map<String, LocalResource> jobLocalResources,
- List<TaskLocationHint> locations, int stageNum, int totalStages)
- throws IOException {
- // stageNum starts from 0, goes till numStages - 1
- boolean isMap = false;
- if (stageNum == 0) {
- isMap = true;
- }
-
- int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
- : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
- String processorName = isMap ? MapProcessor.class.getName()
- : ReduceProcessor.class.getName();
- String vertexName = null;
- if (isMap) {
- vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
- } else {
- if (stageNum == totalStages - 1) {
- vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
- } else {
- vertexName = MultiStageMRConfigUtil
- .getIntermediateStageVertexName(stageNum);
- }
- }
-
- Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
- : MRHelpers.getReduceResource(stageConf);
- byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
- Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
- setUserPayload(vertexUserPayload),
- numTasks, taskResource);
- if (isMap) {
- byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
- MRHelpers.addMRInput(vertex, mapInputPayload, null);
- }
- // Map only jobs.
- if (stageNum == totalStages -1) {
- MRHelpers.addMROutput(vertex, vertexUserPayload);
- }
-
- Map<String, String> taskEnv = new HashMap<String, String>();
- setupMapReduceEnv(stageConf, taskEnv, isMap);
-
- Map<String, LocalResource> taskLocalResources =
- new TreeMap<String, LocalResource>();
- // PRECOMMIT Remove split localization for reduce tasks if it's being set
- // here
- taskLocalResources.putAll(jobLocalResources);
-
- String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
- : MRHelpers.getReduceJavaOpts(stageConf);
-
- vertex.setTaskEnvironment(taskEnv)
- .setTaskLocalResources(taskLocalResources)
- .setTaskLocationsHint(locations)
- .setJavaOpts(taskJavaOpts);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding vertex to DAG" + ", vertexName="
- + vertex.getVertexName() + ", processor="
- + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
- + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
- + ", resources=" + vertex.getTaskResource()
- // TODO Add localResources and Environment
- );
- }
-
- return vertex;
- }
-
- private DAG createDAG(FileSystem fs, JobID jobId, Configuration[] stageConfs,
- String jobSubmitDir, Credentials ts,
- Map<String, LocalResource> jobLocalResources) throws IOException {
-
- String jobName = stageConfs[0].get(MRJobConfig.JOB_NAME,
- YarnConfiguration.DEFAULT_APPLICATION_NAME);
- DAG dag = new DAG(jobName);
-
- LOG.info("Number of stages: " + stageConfs.length);
-
- List<TaskLocationHint> mapInputLocations =
- getMapLocationHintsFromInputSplits(
- jobId, fs, stageConfs[0], jobSubmitDir);
- List<TaskLocationHint> reduceInputLocations = null;
-
- Vertex[] vertices = new Vertex[stageConfs.length];
- for (int i = 0; i < stageConfs.length; i++) {
- vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
- i == 0 ? mapInputLocations : reduceInputLocations, i,
- stageConfs.length);
- }
-
- for (int i = 0; i < vertices.length; i++) {
- dag.addVertex(vertices[i]);
- if (i > 0) {
- EdgeProperty edgeProperty = new EdgeProperty(
- DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(OnFileSortedOutput.class.getName()),
- new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
-
- Edge edge = null;
- edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
- dag.addEdge(edge);
- }
-
- }
- return dag;
- }
-
- private TezConfiguration getDAGAMConfFromMRConf() {
- TezConfiguration finalConf = new TezConfiguration(this.tezConf);
- Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
- .getMRToDAGParamMap();
-
- for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
- if (finalConf.get(entry.getKey()) != null) {
- finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
- finalConf.unset(entry.getKey());
- if (LOG.isDebugEnabled()) {
- LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
- + " to Tez key: " + entry.getValue() + " with value "
- + finalConf.get(entry.getValue()));
- }
- }
- }
- return finalConf;
- }
-
- @Override
- public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
- throws IOException, InterruptedException {
-
- ApplicationId appId = resMgrDelegate.getApplicationId();
-
- FileSystem fs = FileSystem.get(conf);
- // Loads the job.xml written by the user.
- JobConf jobConf = new JobConf(new TezConfiguration(conf));
-
- // Extract individual raw MR configs.
- Configuration[] stageConfs = MultiStageMRConfToTezTranslator
- .getStageConfs(jobConf);
-
- // Transform all confs to use Tez keys
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
- null);
- for (int i = 1; i < stageConfs.length; i++) {
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
- stageConfs[i - 1]);
- }
-
- // create inputs to tezClient.submit()
-
- // FIXME set up job resources
- Map<String, LocalResource> jobLocalResources =
- createJobLocalResources(stageConfs[0], jobSubmitDir);
-
- // FIXME createDAG should take the tezConf as a parameter, instead of using
- // MR keys.
- DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
- jobLocalResources);
-
- List<String> vargs = new LinkedList<String>();
- // admin command opts and user command opts
- String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
- MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
- warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
- MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
- vargs.add(mrAppMasterAdminOptions);
-
- // Add AM user command opts
- String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
- MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
- warnForJavaLibPath(mrAppMasterUserOptions, "app master",
- MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
- vargs.add(mrAppMasterUserOptions);
-
- StringBuilder javaOpts = new StringBuilder();
- for (String varg : vargs) {
- javaOpts.append(varg).append(" ");
- }
-
- // Setup the CLASSPATH in environment
- // i.e. add { Hadoop jars, job jar, CWD } to classpath.
- Map<String, String> environment = new HashMap<String, String>();
-
- // Setup the environment variables for AM
- MRHelpers.updateEnvironmentForMRAM(conf, environment);
-
- TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
- dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
-
- // Submit to ResourceManager
- try {
- dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
- jobSubmitDir);
- AMConfiguration amConfig = new AMConfiguration(
- jobConf.get(JobContext.QUEUE_NAME,
- YarnConfiguration.DEFAULT_QUEUE_NAME),
- environment,
- jobLocalResources, dagAMConf, ts);
- tezClient.submitDAGApplication(appId, dag, amConfig);
- } catch (TezException e) {
- throw new IOException(e);
- }
-
- return getJobStatus(jobId);
- }
-
- private LocalResource createApplicationResource(FileContext fs, Path p,
- LocalResourceType type) throws IOException {
- LocalResource rsrc = Records.newRecord(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(type);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
- }
-
- @Override
- public void setJobPriority(JobID arg0, String arg1) throws IOException,
- InterruptedException {
- resMgrDelegate.setJobPriority(arg0, arg1);
- }
-
- @Override
- public long getProtocolVersion(String arg0, long arg1) throws IOException {
- return resMgrDelegate.getProtocolVersion(arg0, arg1);
- }
-
- @Override
- public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
- throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Use Token.renew instead");
- }
-
-
- @Override
- public Counters getJobCounters(JobID arg0) throws IOException,
- InterruptedException {
- return clientCache.getClient(arg0).getJobCounters(arg0);
- }
-
- @Override
- public String getJobHistoryDir() throws IOException, InterruptedException {
- return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
- }
-
- @Override
- public JobStatus getJobStatus(JobID jobID) throws IOException,
- InterruptedException {
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- String jobFile = MRApps.getJobFile(conf, user, jobID);
- DAGStatus dagStatus;
- try {
- if(dagClient == null) {
- dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
- }
- dagStatus = dagClient.getDAGStatus();
- return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
- } catch (TezException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
- int arg2) throws IOException, InterruptedException {
- return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
- }
-
- @Override
- public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
- InterruptedException {
- return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
- }
-
- @Override
- public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
- throws IOException, InterruptedException {
- return clientCache.getClient(jobID)
- .getTaskReports(jobID, taskType);
- }
-
- @Override
- public void killJob(JobID arg0) throws IOException, InterruptedException {
- /* check if the status is not running, if not send kill to RM */
- JobStatus status = getJobStatus(arg0);
- if (status.getState() == JobStatus.State.RUNNING ||
- status.getState() == JobStatus.State.PREP) {
- try {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- } catch (YarnException e) {
- throw new IOException(e);
- }
- return;
- }
- }
-
- @Override
- public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
- InterruptedException {
- return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
- }
-
- @Override
- public AccessControlList getQueueAdmins(String arg0) throws IOException {
- return new AccessControlList("*");
- }
-
- @Override
- public JobTrackerStatus getJobTrackerStatus() throws IOException,
- InterruptedException {
- return JobTrackerStatus.RUNNING;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
- clientMethodsHash);
- }
-
- @Override
- public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
- throws IOException {
- try {
- return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
- } catch (YarnException e) {
- throw new IOException(e);
- }
- }
-
- private static void warnForJavaLibPath(String opts, String component,
- String javaConf, String envConf) {
- if (opts != null && opts.contains("-Djava.library.path")) {
- LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
- "programs to no longer function if hadoop native libraries " +
- "are used. These values should be set as part of the " +
- "LD_LIBRARY_PATH in the " + component + " JVM env using " +
- envConf + " config settings.");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
deleted file mode 100644
index 680939b..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-
-public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
-
- @Override
- public ClientProtocol create(Configuration conf) throws IOException {
- if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
- return new YARNRunner(conf);
- }
- return null;
- }
-
- @Override
- public ClientProtocol create(InetSocketAddress addr, Configuration conf)
- throws IOException {
- return create(conf);
- }
-
- @Override
- public void close(ClientProtocol clientProtocol) throws IOException {
- // nothing to do
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
deleted file mode 100644
index 88816ca..0000000
--- a/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
+++ /dev/null
@@ -1,14 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.tez.mapreduce.YarnTezClientProtocolProvider
[3/3] git commit: TEZ-581. Rename MiniMRRTezCluster to
MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)
Posted by bi...@apache.org.
TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/360d30a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/360d30a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/360d30a9
Branch: refs/heads/master
Commit: 360d30a9fcad3a412901b48d85596b863b73f0d7
Parents: 1c44f63
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Oct 25 17:10:19 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Oct 25 17:10:19 2013 -0700
----------------------------------------------------------------------
pom.xml | 10 +-
tez-dist/pom.xml | 8 +-
tez-mapreduce-tests/pom.xml | 98 ---
.../apache/tez/mapreduce/MiniMRRTezCluster.java | 172 -----
.../org/apache/tez/mapreduce/TestMRRJobs.java | 358 ---------
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 532 --------------
.../tez/mapreduce/client/ClientCache.java | 50 ++
.../mapreduce/client/ClientServiceDelegate.java | 105 +++
.../tez/mapreduce/client/DAGJobStatus.java | 386 ++++++++++
.../tez/mapreduce/client/NotRunningJob.java | 240 ++++++
.../mapreduce/client/ResourceMgrDelegate.java | 232 ++++++
.../apache/tez/mapreduce/client/YARNRunner.java | 722 +++++++++++++++++++
.../client/YarnTezClientProtocolProvider.java | 50 ++
...op.mapreduce.protocol.ClientProtocolProvider | 14 +
tez-tests/pom.xml | 93 +++
.../org/apache/tez/mapreduce/TestMRRJobs.java | 359 +++++++++
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 533 ++++++++++++++
.../org/apache/tez/test/MiniTezCluster.java | 172 +++++
tez-yarn-client/findbugs-exclude.xml | 16 -
tez-yarn-client/pom.xml | 72 --
.../org/apache/tez/mapreduce/ClientCache.java | 50 --
.../tez/mapreduce/ClientServiceDelegate.java | 105 ---
.../org/apache/tez/mapreduce/DAGJobStatus.java | 386 ----------
.../org/apache/tez/mapreduce/NotRunningJob.java | 240 ------
.../tez/mapreduce/ResourceMgrDelegate.java | 232 ------
.../org/apache/tez/mapreduce/YARNRunner.java | 722 -------------------
.../YarnTezClientProtocolProvider.java | 50 --
...op.mapreduce.protocol.ClientProtocolProvider | 14 -
28 files changed, 2959 insertions(+), 3062 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3addf16..34cefb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,7 +120,7 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce-tests</artifactId>
+ <artifactId>tez-tests</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -129,11 +129,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-yarn-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
@@ -259,10 +254,9 @@
<module>tez-common</module>
<module>tez-runtime-library</module>
<module>tez-runtime-internals</module>
- <module>tez-yarn-client</module>
<module>tez-mapreduce</module>
<module>tez-mapreduce-examples</module>
- <module>tez-mapreduce-tests</module>
+ <module>tez-tests</module>
<module>tez-dag</module>
<module>tez-dist</module>
<module>docs</module>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index e946f78..dc72598 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -30,12 +30,6 @@
<packaging>pom</packaging>
<dependencies>
- <!--tez-yarn-client should require all other modules to be built before it, so this becomes the last -->
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-yarn-client</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-dag</artifactId>
@@ -48,7 +42,7 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce-tests</artifactId>
+ <artifactId>tez-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/pom.xml b/tez-mapreduce-tests/pom.xml
deleted file mode 100644
index 4ffed02..0000000
--- a/tez-mapreduce-tests/pom.xml
+++ /dev/null
@@ -1,98 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez</artifactId>
- <version>0.2.0-SNAPSHOT</version>
- </parent>
- <artifactId>tez-mapreduce-tests</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-dag</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-yarn-client</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce-examples</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
deleted file mode 100644
index 278e124..0000000
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.DAGAppMaster;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-/**
- * Configures and starts the Tez-specific components in the YARN cluster.
- *
- * When using this mini cluster, the user is expected to
- */
-public class MiniMRRTezCluster extends MiniYARNCluster {
-
- public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
-
- private static final Log LOG = LogFactory.getLog(MiniMRRTezCluster.class);
-
- private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
-
- private Path confFilePath;
-
- public MiniMRRTezCluster(String testName) {
- this(testName, 1);
- }
-
- public MiniMRRTezCluster(String testName, int noOfNMs) {
- super(testName, noOfNMs, 4, 4);
- }
-
- public MiniMRRTezCluster(String testName, int noOfNMs,
- int numLocalDirs, int numLogDirs) {
- super(testName, noOfNMs, numLocalDirs, numLogDirs);
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
- if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
- conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
- "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
- }
-
- if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
- // nothing defined. set quick delete value
- conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
- }
-
- File appJarFile = new File(MiniMRRTezCluster.APPJAR);
-
- if (!appJarFile.exists()) {
- String message = "TezAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Exiting.";
- LOG.info(message);
- throw new TezUncheckedException(message);
- } else {
- conf.set(TezConfiguration.TEZ_LIB_URIS, "file://" + appJarFile.getAbsolutePath());
- LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
- }
-
- // VMEM monitoring disabled, PMEM monitoring enabled.
- conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
- conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
-
- conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
-
- try {
- Path stagingPath = FileContext.getFileContext(conf).makeQualified(
- new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
- FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
- if (fc.util().exists(stagingPath)) {
- LOG.info(stagingPath + " exists! deleting...");
- fc.delete(stagingPath, true);
- }
- LOG.info("mkdir: " + stagingPath);
- fc.mkdir(stagingPath, null, true);
-
- //mkdir done directory as well
- String doneDir =
- JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
- Path doneDirPath = fc.makeQualified(new Path(doneDir));
- fc.mkdir(doneDirPath, null, true);
- } catch (IOException e) {
- throw new TezUncheckedException("Could not create staging directory. ", e);
- }
- conf.set(MRConfig.MASTER_ADDRESS, "test");
-
- //configure the shuffle service in NM
- conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
- new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
- conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
- ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
- Service.class);
-
- // Non-standard shuffle port
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
-
- conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
- DefaultContainerExecutor.class, ContainerExecutor.class);
-
- // TestMRJobs is for testing non-uberized operation only; see TestUberAM
- // for corresponding uberized tests.
- conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
- super.serviceInit(conf);
- }
-
- @Override
- public void serviceStart() throws Exception {
- super.serviceStart();
- File workDir = super.getTestWorkDir();
- Configuration conf = super.getConfig();
-
- confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
- File confFile = new File(confFilePath.toString());
- try {
- confFile.createNewFile();
- conf.writeXml(new FileOutputStream(confFile));
- confFile.deleteOnExit();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- confFilePath = new Path(confFile.getAbsolutePath());
- conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- workDir.getAbsolutePath(), System.getProperty("java.class.path"));
- LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
- + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
- }
-
- public Path getConfigFilePath() {
- return confFilePath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
deleted file mode 100644
index b85ddb6..0000000
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.RandomTextWriterJob;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.mapreduce.examples.MRRSleepJob;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestMRRJobs {
-
- private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
-
- protected static MiniMRRTezCluster mrrTezCluster;
- protected static MiniDFSCluster dfsCluster;
-
- private static Configuration conf = new Configuration();
- private static FileSystem remoteFs;
-
- private static String TEST_ROOT_DIR = "target"
- + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
-
- private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
- TestMRRJobs.class.getSimpleName();
-
- @BeforeClass
- public static void setup() throws IOException {
- try {
- conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
- dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .format(true).racks(null).build();
- remoteFs = dfsCluster.getFileSystem();
- } catch (IOException io) {
- throw new RuntimeException("problem starting mini dfs cluster", io);
- }
-
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Not running test.");
- return;
- }
-
- if (mrrTezCluster == null) {
- mrrTezCluster = new MiniMRRTezCluster(TestMRRJobs.class.getName(), 1,
- 1, 1);
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
- conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
- conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
- mrrTezCluster.init(conf);
- mrrTezCluster.start();
- }
-
- }
-
- @AfterClass
- public static void tearDown() {
- if (mrrTezCluster != null) {
- mrrTezCluster.stop();
- mrrTezCluster = null;
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
- }
-
- @Test (timeout = 60000)
- public void testMRRSleepJob() throws IOException, InterruptedException,
- ClassNotFoundException {
- LOG.info("\n\n\nStarting testMRRSleepJob().");
-
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Not running test.");
- return;
- }
-
- Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
- MRRSleepJob sleepJob = new MRRSleepJob();
- sleepJob.setConf(sleepConf);
-
- Job job = sleepJob.createJob(1, 1, 1, 1, 1,
- 1, 1, 1, 1, 1);
-
- job.setJarByClass(MRRSleepJob.class);
- job.setMaxMapAttempts(1); // speed up failures
- job.submit();
- String trackingUrl = job.getTrackingURL();
- String jobId = job.getJobID().toString();
- boolean succeeded = job.waitForCompletion(true);
- Assert.assertTrue(succeeded);
- Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
- Assert.assertTrue("Tracking URL was " + trackingUrl +
- " but didn't Match Job ID " + jobId ,
- trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-
- // FIXME once counters and task progress can be obtained properly
- // TODO use dag client to test counters and task progress?
- // what about completed jobs?
- }
-
- @Test (timeout = 60000)
- public void testRandomWriter() throws IOException, InterruptedException,
- ClassNotFoundException {
-
- LOG.info("\n\n\nStarting testRandomWriter().");
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Not running test.");
- return;
- }
-
- RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
- mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
- mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
- Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
- Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setSpeculativeExecution(false);
- job.setJarByClass(RandomTextWriterJob.class);
- job.setMaxMapAttempts(1); // speed up failures
- job.submit();
- String trackingUrl = job.getTrackingURL();
- String jobId = job.getJobID().toString();
- boolean succeeded = job.waitForCompletion(true);
- Assert.assertTrue(succeeded);
- Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
- Assert.assertTrue("Tracking URL was " + trackingUrl +
- " but didn't Match Job ID " + jobId ,
- trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-
- // Make sure there are three files in the output-dir
-
- RemoteIterator<FileStatus> iterator =
- FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
- outputDir);
- int count = 0;
- while (iterator.hasNext()) {
- FileStatus file = iterator.next();
- if (!file.getPath().getName()
- .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
- count++;
- }
- }
- Assert.assertEquals("Number of part files is wrong!", 3, count);
-
- }
-
-
- @Test (timeout = 60000)
- public void testFailingJob() throws IOException, InterruptedException,
- ClassNotFoundException {
-
- LOG.info("\n\n\nStarting testFailingJob().");
-
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Not running test.");
- return;
- }
-
- Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
- MRRSleepJob sleepJob = new MRRSleepJob();
- sleepJob.setConf(sleepConf);
-
- Job job = sleepJob.createJob(1, 1, 1, 1, 1,
- 1, 1, 1, 1, 1);
-
- job.setJarByClass(MRRSleepJob.class);
- job.setMaxMapAttempts(1); // speed up failures
- job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
- job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
-
- job.submit();
- boolean succeeded = job.waitForCompletion(true);
- Assert.assertFalse(succeeded);
- Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
-
- // FIXME once counters and task progress can be obtained properly
- // TODO verify failed task diagnostics
- }
-
- @Test (timeout = 60000)
- public void testFailingAttempt() throws IOException, InterruptedException,
- ClassNotFoundException {
-
- LOG.info("\n\n\nStarting testFailingAttempt().");
-
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Not running test.");
- return;
- }
-
- Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
- MRRSleepJob sleepJob = new MRRSleepJob();
- sleepJob.setConf(sleepConf);
-
- Job job = sleepJob.createJob(1, 1, 1, 1, 1,
- 1, 1, 1, 1, 1);
-
- job.setJarByClass(MRRSleepJob.class);
- job.setMaxMapAttempts(3); // speed up failures
- job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
- job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
-
- job.submit();
- boolean succeeded = job.waitForCompletion(true);
- Assert.assertTrue(succeeded);
- Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-
- // FIXME once counters and task progress can be obtained properly
- // TODO verify failed task diagnostics
- }
-
- @Test (timeout = 60000)
- public void testMRRSleepJobWithCompression() throws IOException,
- InterruptedException, ClassNotFoundException {
- LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
-
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
- + " not found. Not running test.");
- return;
- }
-
- Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
- MRRSleepJob sleepJob = new MRRSleepJob();
- sleepJob.setConf(sleepConf);
-
- Job job = sleepJob.createJob(1, 1, 2, 1, 1,
- 1, 1, 1, 1, 1);
-
- job.setJarByClass(MRRSleepJob.class);
- job.setMaxMapAttempts(1); // speed up failures
-
- // enable compression
- job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
- job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
- DefaultCodec.class.getName());
-
- job.submit();
- String trackingUrl = job.getTrackingURL();
- String jobId = job.getJobID().toString();
- boolean succeeded = job.waitForCompletion(true);
- Assert.assertTrue(succeeded);
- Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
- Assert.assertTrue("Tracking URL was " + trackingUrl +
- " but didn't Match Job ID " + jobId ,
- trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-
- // FIXME once counters and task progress can be obtained properly
- // TODO use dag client to test counters and task progress?
- // what about completed jobs?
-
- }
-
-
- /*
- //@Test (timeout = 60000)
- public void testMRRSleepJobWithSecurityOn() throws IOException,
- InterruptedException, ClassNotFoundException {
-
- LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
-
- if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
- return;
- }
-
- mrrTezCluster.getConfig().set(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "kerberos");
- mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
- mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
- mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
- "rm/sightbusy-lx@LOCALHOST");
- mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
- "nm/sightbusy-lx@LOCALHOST");
-
- UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
-
- // Keep it in here instead of after RM/NM as multiple user logins happen in
- // the same JVM.
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
- LOG.info("User name is " + user.getUserName());
- for (Token<? extends TokenIdentifier> str : user.getTokens()) {
- LOG.info("Token is " + str.encodeToUrlString());
- }
- user.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- MRRSleepJob sleepJob = new MRRSleepJob();
- sleepJob.setConf(mrrTezCluster.getConfig());
- Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
- // //Job with reduces
- // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
- job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
- job.submit();
- String trackingUrl = job.getTrackingURL();
- String jobId = job.getJobID().toString();
- job.waitForCompletion(true);
- Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
- Assert.assertTrue("Tracking URL was " + trackingUrl +
- " but didn't Match Job ID " + jobId ,
- trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
- return null;
- }
- });
-
- // TODO later: add explicit "isUber()" checks of some sort
- }
- */
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
deleted file mode 100644
index a6dbe26..0000000
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
-import org.apache.tez.client.TezSessionStatus;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.examples.MRRSleepJob;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestMRRJobsDAGApi {
-
- private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
-
- protected static MiniMRRTezCluster mrrTezCluster;
- protected static MiniDFSCluster dfsCluster;
-
- private static Configuration conf = new Configuration();
- private static FileSystem remoteFs;
-
- private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
- + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
-
- @BeforeClass
- public static void setup() throws IOException {
- try {
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
- dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .format(true).racks(null).build();
- remoteFs = dfsCluster.getFileSystem();
- } catch (IOException io) {
- throw new RuntimeException("problem starting mini dfs cluster", io);
- }
-
- if (mrrTezCluster == null) {
- mrrTezCluster = new MiniMRRTezCluster(TestMRRJobsDAGApi.class.getName(),
- 1, 1, 1);
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
- mrrTezCluster.init(conf);
- mrrTezCluster.start();
- }
-
- }
-
- @AfterClass
- public static void tearDown() {
- if (mrrTezCluster != null) {
- mrrTezCluster.stop();
- mrrTezCluster = null;
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
- // TODO Add cleanup code.
- }
-
- // Submits a simple 5 stage sleep job using the DAG submit API instead of job
- // client.
- @Test(timeout = 60000)
- public void testMRRSleepJobDagSubmit() throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
- State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
-
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
- // TODO Add additional checks for tracking URL etc. - once it's exposed by
- // the DAG API.
- }
-
- // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
- @Test(timeout = 60000)
- public void testMRRSleepJobDagSubmitAndKill() throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
- State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
-
- Assert.assertEquals(DAGStatus.State.KILLED, finalState);
- // TODO Add additional checks for tracking URL etc. - once it's exposed by
- // the DAG API.
- }
-
- // Submits a DAG to AM via RPC after AM has started
- @Test(timeout = 60000)
- public void testMRRSleepJobViaSession() throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
- State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
-
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
- }
-
- // Submits a DAG to AM via RPC after AM has started
- @Test(timeout = 120000)
- public void testMultipleMRRSleepJobViaSession() throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
- Map<String, String> commonEnv = createCommonEnv();
- Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
- .valueOf(new Random().nextInt(100000))));
- remoteFs.mkdirs(remoteStagingDir);
- TezConfiguration tezConf = new TezConfiguration(
- mrrTezCluster.getConfig());
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
- remoteStagingDir.toString());
-
- Map<String, LocalResource> amLocalResources =
- new HashMap<String, LocalResource>();
-
- AMConfiguration amConfig = new AMConfiguration(
- "default", commonEnv, amLocalResources,
- tezConf, null);
- TezSessionConfiguration tezSessionConfig =
- new TezSessionConfiguration(amConfig, tezConf);
- TezSession tezSession = new TezSession("testsession", tezSessionConfig);
- tezSession.start();
- Assert.assertEquals(TezSessionStatus.INITIALIZING,
- tezSession.getSessionStatus());
-
- State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
- tezSession, false);
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
- Assert.assertEquals(TezSessionStatus.READY,
- tezSession.getSessionStatus());
- finalState = testMRRSleepJobDagSubmitCore(true, false, false,
- tezSession, false);
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
- Assert.assertEquals(TezSessionStatus.READY,
- tezSession.getSessionStatus());
-
- ApplicationId appId = tezSession.getApplicationId();
- tezSession.stop();
- Assert.assertEquals(TezSessionStatus.SHUTDOWN,
- tezSession.getSessionStatus());
-
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(mrrTezCluster.getConfig());
- yarnClient.start();
-
- while (true) {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (appReport.getYarnApplicationState().equals(
- YarnApplicationState.FINISHED)
- || appReport.getYarnApplicationState().equals(
- YarnApplicationState.FAILED)
- || appReport.getYarnApplicationState().equals(
- YarnApplicationState.KILLED)) {
- break;
- }
- }
-
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- Assert.assertEquals(YarnApplicationState.FINISHED,
- appReport.getYarnApplicationState());
- Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
- appReport.getFinalApplicationStatus());
- }
-
- // Submits a simple 5 stage sleep job using tez session. Then kills it.
- @Test(timeout = 60000)
- public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
- State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
-
- Assert.assertEquals(DAGStatus.State.KILLED, finalState);
- // TODO Add additional checks for tracking URL etc. - once it's exposed by
- // the DAG API.
- }
-
- // Create and close a tez session without submitting a job
- @Test(timeout = 60000)
- public void testTezSessionShutdown() throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
- testMRRSleepJobDagSubmitCore(true, false, true, false);
- }
-
- @Test(timeout = 60000)
- public void testAMSplitGeneration() throws IOException, InterruptedException,
- TezException, ClassNotFoundException, YarnException {
- testMRRSleepJobDagSubmitCore(true, false, false, true);
- }
-
- public State testMRRSleepJobDagSubmitCore(
- boolean dagViaRPC,
- boolean killDagWhileRunning,
- boolean closeSessionBeforeSubmit,
- boolean genSplitsInAM) throws IOException,
- InterruptedException, TezException, ClassNotFoundException,
- YarnException {
- return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
- closeSessionBeforeSubmit, null, genSplitsInAM);
- }
-
- private Map<String, String> createCommonEnv() {
- Map<String, String> commonEnv = new HashMap<String, String>();
- return commonEnv;
- }
-
- public State testMRRSleepJobDagSubmitCore(
- boolean dagViaRPC,
- boolean killDagWhileRunning,
- boolean closeSessionBeforeSubmit,
- TezSession reUseTezSession,
- boolean genSplitsInAM) throws IOException,
- InterruptedException, TezException, ClassNotFoundException,
- YarnException {
- LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
-
- JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
- JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
- JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
-
- stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
- stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
- stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
- stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
- stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
- IntWritable.class.getName());
- stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
- IntWritable.class.getName());
- stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
- SleepInputFormat.class.getName());
- stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
- MRRSleepJobPartitioner.class.getName());
-
- stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
- stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
- stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
- stage2Conf
- .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
- stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
- IntWritable.class.getName());
- stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
- IntWritable.class.getName());
- stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
- MRRSleepJobPartitioner.class.getName());
-
- JobConf stage22Conf = new JobConf(stage2Conf);
- stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
-
- stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
- stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
- stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
- stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
- stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
- IntWritable.class.getName());
- stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
- IntWritable.class.getName());
- stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
- NullOutputFormat.class.getName());
-
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
- stage1Conf);
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage22Conf,
- stage1Conf);
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
- stage2Conf); // this also works stage22 as it sets up keys etc
-
- MRHelpers.doJobClientMagic(stage1Conf);
- MRHelpers.doJobClientMagic(stage2Conf);
- MRHelpers.doJobClientMagic(stage22Conf);
- MRHelpers.doJobClientMagic(stage3Conf);
-
- Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
- .valueOf(new Random().nextInt(100000))));
- TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
- InputSplitInfo inputSplitInfo = null;
- if (!genSplitsInAM) {
- inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
- remoteStagingDir);
- }
-
- byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
- byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
- byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
-
- DAG dag = new DAG("testMRRSleepJobDagSubmit");
- int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
- Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
- : null;
- Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
- MapProcessor.class.getName()).setUserPayload(stage1Payload),
- stage1NumTasks, Resource.newInstance(256, 1));
- MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
- Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
- ReduceProcessor.class.getName()).setUserPayload(
- MRHelpers.createUserPayloadFromConf(stage2Conf)),
- 1, Resource.newInstance(256, 1));
- Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
- ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
- 1, Resource.newInstance(256, 1));
- MRHelpers.addMROutput(stage3Vertex, stage3Payload);
-
- Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
- Map<String, String> commonEnv = createCommonEnv();
-
- if (!genSplitsInAM) {
- // TODO Use utility method post TEZ-205.
- Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
- stage1LocalResources.put(
- inputSplitInfo.getSplitsFile().getName(),
- createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- stage1LocalResources.put(
- inputSplitInfo.getSplitsMetaInfoFile().getName(),
- createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- stage1LocalResources.putAll(commonLocalResources);
-
- stage1Vertex.setTaskLocalResources(stage1LocalResources);
- stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
- } else {
- stage1Vertex.setTaskLocalResources(commonLocalResources);
- }
-
- stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
- stage1Vertex.setTaskEnvironment(commonEnv);
-
- // TODO env, resources
-
- stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
- stage2Vertex.setTaskLocalResources(commonLocalResources);
- stage2Vertex.setTaskEnvironment(commonEnv);
-
- stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
- stage3Vertex.setTaskLocalResources(commonLocalResources);
- stage3Vertex.setTaskEnvironment(commonEnv);
-
- dag.addVertex(stage1Vertex);
- dag.addVertex(stage2Vertex);
- dag.addVertex(stage3Vertex);
-
- Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
- DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL, new OutputDescriptor(
- OnFileSortedOutput.class.getName()), new InputDescriptor(
- ShuffledMergedInputLegacy.class.getName())));
- Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
- DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL, new OutputDescriptor(
- OnFileSortedOutput.class.getName()), new InputDescriptor(
- ShuffledMergedInputLegacy.class.getName())));
-
- dag.addEdge(edge1);
- dag.addEdge(edge2);
-
- Map<String, LocalResource> amLocalResources =
- new HashMap<String, LocalResource>();
- amLocalResources.putAll(commonLocalResources);
-
- TezConfiguration tezConf = new TezConfiguration(
- mrrTezCluster.getConfig());
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
- remoteStagingDir.toString());
-
- TezClient tezClient = new TezClient(tezConf);
- DAGClient dagClient = null;
- TezSession tezSession = null;
- boolean reuseSession = reUseTezSession != null;
- TezSessionConfiguration tezSessionConfig;
- AMConfiguration amConfig = new AMConfiguration(
- "default", commonEnv, amLocalResources,
- tezConf, null);
- if(!dagViaRPC) {
- // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
- dagClient = tezClient.submitDAGApplication(dag, amConfig);
- } else {
- if (reuseSession) {
- tezSession = reUseTezSession;
- } else {
- tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
- tezSession = new TezSession("testsession", tezSessionConfig);
- tezSession.start();
- }
- }
-
- if (dagViaRPC && closeSessionBeforeSubmit) {
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(mrrTezCluster.getConfig());
- yarnClient.start();
- boolean sentKillSession = false;
- while(true) {
- Thread.sleep(500l);
- ApplicationReport appReport =
- yarnClient.getApplicationReport(tezSession.getApplicationId());
- if (appReport == null) {
- continue;
- }
- YarnApplicationState appState = appReport.getYarnApplicationState();
- if (!sentKillSession) {
- if (appState == YarnApplicationState.RUNNING) {
- tezSession.stop();
- sentKillSession = true;
- }
- } else {
- if (appState == YarnApplicationState.FINISHED
- || appState == YarnApplicationState.KILLED
- || appState == YarnApplicationState.FAILED) {
- LOG.info("Application completed after sending session shutdown"
- + ", yarnApplicationState=" + appState
- + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
- Assert.assertEquals(YarnApplicationState.FINISHED,
- appState);
- Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
- appReport.getFinalApplicationStatus());
- break;
- }
- }
- }
- yarnClient.stop();
- return null;
- }
-
- if(dagViaRPC) {
- LOG.info("Submitting dag to tez session with appId="
- + tezSession.getApplicationId());
- dagClient = tezSession.submitDAG(dag);
- Assert.assertEquals(TezSessionStatus.RUNNING,
- tezSession.getSessionStatus());
- }
- DAGStatus dagStatus = dagClient.getDAGStatus();
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms."
- + " Current state: " + dagStatus.getState());
- Thread.sleep(500l);
- if(killDagWhileRunning
- && dagStatus.getState() == DAGStatus.State.RUNNING) {
- LOG.info("Killing running dag/session");
- if (dagViaRPC) {
- tezSession.stop();
- } else {
- dagClient.tryKillDAG();
- }
- }
- dagStatus = dagClient.getDAGStatus();
- }
- if (dagViaRPC && !reuseSession) {
- tezSession.stop();
- }
- return dagStatus.getState();
- }
-
- private static LocalResource createLocalResource(FileSystem fc, Path file,
- LocalResourceType type, LocalResourceVisibility visibility)
- throws IOException {
- FileStatus fstat = fc.getFileStatus(file);
- URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
- .getPath()));
- long resourceSize = fstat.getLen();
- long resourceModificationTime = fstat.getModificationTime();
-
- return LocalResource.newInstance(resourceURL, type, visibility,
- resourceSize, resourceModificationTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java
new file mode 100644
index 0000000..f7c8e07
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+
+public class ClientCache {
+
+ private final Configuration conf;
+ private final ResourceMgrDelegate rm;
+
+ private Map<JobID, ClientServiceDelegate> cache =
+ new HashMap<JobID, ClientServiceDelegate>();
+
+ public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+ this.conf = conf;
+ this.rm = rm;
+ }
+
+ //TODO: evict from the cache on some threshold
+ public synchronized ClientServiceDelegate getClient(JobID jobId) {
+ ClientServiceDelegate client = cache.get(jobId);
+ if (client == null) {
+ client = new ClientServiceDelegate(conf, rm, jobId);
+ cache.put(jobId, client);
+ }
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java
new file mode 100644
index 0000000..e0fab1f
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import java.io.IOException;
+
+public class ClientServiceDelegate {
+
+ private final TezConfiguration conf;
+
+ // FIXME
+ // how to handle completed jobs that the RM does not know about?
+
+ public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ JobID jobId) {
+ this.conf = new TezConfiguration(conf); // Cloning for modifying.
+ // For faster redirects from AM to HS.
+ this.conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
+ }
+
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
+ throws IOException, InterruptedException {
+ // FIXME needs counters support from DAG
+ // with a translation layer on client side
+ org.apache.hadoop.mapreduce.Counters empty =
+ new org.apache.hadoop.mapreduce.Counters();
+ return empty;
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
+ int fromEventId, int maxEvents)
+ throws IOException, InterruptedException {
+ // FIXME seems like there is support in client to query task failure
+ // related information
+ // However, api does not make sense for DAG
+ return new TaskCompletionEvent[0];
+ }
+
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
+ taId)
+ throws IOException, InterruptedException {
+ // FIXME need support to query task diagnostics?
+ return new String[0];
+ }
+
+ public JobStatus getJobStatus(JobID oldJobID) throws IOException {
+ // handled in YARNRunner
+ throw new UnsupportedOperationException();
+ }
+
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+ JobID oldJobID, TaskType taskType)
+ throws IOException{
+ // TEZ-146: need to return real task reports
+ return new org.apache.hadoop.mapreduce.TaskReport[0];
+ }
+
+ public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+ throws IOException {
+ // FIXME need support to kill a task attempt?
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean killJob(JobID oldJobID)
+ throws IOException {
+ // FIXME need support to kill a dag?
+ // Should this be just an RM killApplication?
+ // For one dag per AM, RM kill should suffice
+ throw new UnsupportedOperationException();
+ }
+
+ public LogParams getLogFilePath(JobID oldJobID,
+ TaskAttemptID oldTaskAttemptID)
+ throws YarnException, IOException {
+ // FIXME logs for an attempt?
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java
new file mode 100644
index 0000000..9acd836
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java
@@ -0,0 +1,386 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+
+import com.google.common.base.Joiner;
+
+public class DAGJobStatus extends JobStatus {
+
+ private final String jobFile;
+ private final DAGStatus dagStatus;
+ private final ApplicationReport report;
+
+ public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
+ super();
+ this.dagStatus = dagStatus;
+ this.jobFile = jobFile;
+ this.report = report;
+ }
+
+ @Override
+ protected synchronized void setMapProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setCleanupProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setSetupProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setReduceProgress(float p) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setPriority(JobPriority jp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setFinishTime(long finishTime) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setHistoryFile(String historyFile) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setTrackingUrl(String trackingUrl) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setRetired() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setState(State state) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setStartTime(long startTime) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setUsername(String userName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setSchedulingInfo(String schedulingInfo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setQueue(String queue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void setFailureInfo(String failureInfo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized String getQueue() {
+ return report.getQueue();
+ }
+
+ @Override
+ public synchronized float getMapProgress() {
+ if(dagStatus.getVertexProgress() != null) {
+ return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName());
+ }
+ if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized float getCleanupProgress() {
+ if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.KILLED ||
+ dagStatus.getState() == DAGStatus.State.ERROR) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized float getSetupProgress() {
+ if (dagStatus.getState() == DAGStatus.State.RUNNING) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized float getReduceProgress() {
+ if(dagStatus.getVertexProgress() != null) {
+ return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
+ }
+ if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+ return 1.0f;
+ }
+ return 0.0f;
+ }
+
+ @Override
+ public synchronized State getState() {
+ switch (dagStatus.getState()) {
+ case SUBMITTED:
+ case INITING:
+ return State.PREP;
+ case RUNNING:
+ return State.RUNNING;
+ case SUCCEEDED:
+ return State.SUCCEEDED;
+ case KILLED:
+ return State.KILLED;
+ case FAILED:
+ case ERROR:
+ return State.FAILED;
+ default:
+ throw new TezUncheckedException("Unknown value of DAGState.State:"
+ + dagStatus.getState());
+ }
+ }
+
+ @Override
+ public synchronized long getStartTime() {
+ return report.getStartTime();
+ }
+
+ @Override
+ public JobID getJobID() {
+ return TypeConverter.fromYarn(report.getApplicationId());
+ }
+
+ @Override
+ public synchronized String getUsername() {
+ return report.getUser();
+ }
+
+ @Override
+ public synchronized String getSchedulingInfo() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+ // TODO Auto-generated method stub
+ return super.getJobACLs();
+ }
+
+ @Override
+ public synchronized JobPriority getPriority() {
+ // TEX-147: return real priority
+ return JobPriority.NORMAL;
+ }
+
+ @Override
+ public synchronized String getFailureInfo() {
+ return Joiner.on(". ").join(dagStatus.getDiagnostics());
+ }
+
+ @Override
+ public synchronized boolean isJobComplete() {
+ return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.KILLED ||
+ dagStatus.getState() == DAGStatus.State.ERROR);
+ }
+
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ // FIXME
+ }
+
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ // FIXME
+ }
+
+ @Override
+ public String getJobName() {
+ return report.getName();
+ }
+
+ @Override
+ public String getJobFile() {
+ return jobFile;
+ }
+
+ @Override
+ public synchronized String getTrackingUrl() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public synchronized long getFinishTime() {
+ return report.getFinishTime();
+ }
+
+ @Override
+ public synchronized boolean isRetired() {
+ // FIXME handle retired jobs?
+ return false;
+ }
+
+ @Override
+ public synchronized String getHistoryFile() {
+ // FIXME handle history in status
+ return null;
+ }
+
+ @Override
+ public int getNumUsedSlots() {
+ return report.getApplicationResourceUsageReport().getNumUsedContainers();
+ }
+
+ @Override
+ public void setNumUsedSlots(int n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumReservedSlots() {
+ return report.getApplicationResourceUsageReport().
+ getNumReservedContainers();
+ }
+
+ @Override
+ public void setNumReservedSlots(int n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUsedMem() {
+ return report.getApplicationResourceUsageReport().
+ getUsedResources().getMemory();
+ }
+
+ @Override
+ public void setUsedMem(int m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getReservedMem() {
+ return report.getApplicationResourceUsageReport().
+ getReservedResources().getMemory();
+ }
+
+ @Override
+ public void setReservedMem(int r) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNeededMem() {
+ return report.getApplicationResourceUsageReport().
+ getNeededResources().getMemory();
+ }
+
+ @Override
+ public void setNeededMem(int n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized boolean isUber() {
+ return false;
+ }
+
+ @Override
+ public synchronized void setUber(boolean isUber) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("job-id : " + getJobID());
+ buffer.append("uber-mode : " + isUber());
+ buffer.append("map-progress : " + getMapProgress());
+ buffer.append("reduce-progress : " + getReduceProgress());
+ buffer.append("cleanup-progress : " + getCleanupProgress());
+ buffer.append("setup-progress : " + getSetupProgress());
+ buffer.append("runstate : " + getState());
+ buffer.append("start-time : " + getStartTime());
+ buffer.append("user-name : " + getUsername());
+ buffer.append("priority : " + getPriority());
+ buffer.append("scheduling-info : " + getSchedulingInfo());
+ buffer.append("num-used-slots" + getNumUsedSlots());
+ buffer.append("num-reserved-slots" + getNumReservedSlots());
+ buffer.append("used-mem" + getUsedMem());
+ buffer.append("reserved-mem" + getReservedMem());
+ buffer.append("needed-mem" + getNeededMem());
+ return buffer.toString();
+ }
+
+ private float getProgress(String vertexName) {
+ Progress progress = dagStatus.getVertexProgress().get(vertexName);
+ if(progress == null) {
+ // no such stage. return 0 like MR app currently does.
+ return 0;
+ }
+ float totalTasks = (float) progress.getTotalTaskCount();
+ if(totalTasks != 0) {
+ return progress.getSucceededTaskCount()/totalTasks;
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java
new file mode 100644
index 0000000..e178948
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java
@@ -0,0 +1,240 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+public class NotRunningJob implements MRClientProtocol {
+
+ private RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private final JobState jobState;
+ private final ApplicationReport applicationReport;
+
+
+ private ApplicationReport getUnknownApplicationReport() {
+ ApplicationId unknownAppId = recordFactory
+ .newRecordInstance(ApplicationId.class);
+ ApplicationAttemptId unknownAttemptId = recordFactory
+ .newRecordInstance(ApplicationAttemptId.class);
+
+ // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+ // used for a non running job
+ return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A",
+ "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A",
+ 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, "TEZ_MRR", null);
+ }
+
+ NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+ this.applicationReport =
+ (applicationReport == null) ?
+ getUnknownApplicationReport() : applicationReport;
+ this.jobState = jobState;
+ }
+
+ @Override
+ public FailTaskAttemptResponse failTaskAttempt(
+ FailTaskAttemptRequest request) throws IOException {
+ FailTaskAttemptResponse resp =
+ recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+ return resp;
+ }
+
+ @Override
+ public GetCountersResponse getCounters(GetCountersRequest request)
+ throws IOException {
+ GetCountersResponse resp =
+ recordFactory.newRecordInstance(GetCountersResponse.class);
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+ counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+ resp.setCounters(counters);
+ return resp;
+ }
+
+ @Override
+ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+ throws IOException {
+ GetDiagnosticsResponse resp =
+ recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+ resp.addDiagnostics("");
+ return resp;
+ }
+
+ @Override
+ public GetJobReportResponse getJobReport(GetJobReportRequest request)
+ throws IOException {
+ JobReport jobReport =
+ recordFactory.newRecordInstance(JobReport.class);
+ jobReport.setJobId(request.getJobId());
+ jobReport.setJobState(jobState);
+ jobReport.setUser(applicationReport.getUser());
+ jobReport.setStartTime(applicationReport.getStartTime());
+ jobReport.setDiagnostics(applicationReport.getDiagnostics());
+ jobReport.setJobName(applicationReport.getName());
+ jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+ jobReport.setFinishTime(applicationReport.getFinishTime());
+
+ GetJobReportResponse resp =
+ recordFactory.newRecordInstance(GetJobReportResponse.class);
+ resp.setJobReport(jobReport);
+ return resp;
+ }
+
+ @Override
+ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+ GetTaskAttemptCompletionEventsRequest request)
+ throws IOException {
+ GetTaskAttemptCompletionEventsResponse resp =
+ recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+ resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+ return resp;
+ }
+
+ @Override
+ public GetTaskAttemptReportResponse getTaskAttemptReport(
+ GetTaskAttemptReportRequest request) throws IOException {
+ //not invoked by anybody
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+ throws IOException {
+ GetTaskReportResponse resp =
+ recordFactory.newRecordInstance(GetTaskReportResponse.class);
+ TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+ report.setTaskId(request.getTaskId());
+ report.setTaskState(TaskState.NEW);
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+ counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+ report.setCounters(counters);
+ report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+ return resp;
+ }
+
+ @Override
+ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+ throws IOException {
+ GetTaskReportsResponse resp =
+ recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+ resp.addAllTaskReports(new ArrayList<TaskReport>());
+ return resp;
+ }
+
+ @Override
+ public KillJobResponse killJob(KillJobRequest request)
+ throws IOException {
+ KillJobResponse resp =
+ recordFactory.newRecordInstance(KillJobResponse.class);
+ return resp;
+ }
+
+ @Override
+ public KillTaskResponse killTask(KillTaskRequest request)
+ throws IOException {
+ KillTaskResponse resp =
+ recordFactory.newRecordInstance(KillTaskResponse.class);
+ return resp;
+ }
+
+ @Override
+ public KillTaskAttemptResponse killTaskAttempt(
+ KillTaskAttemptRequest request) throws IOException {
+ KillTaskAttemptResponse resp =
+ recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+ return resp;
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws IOException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws IOException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws IOException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InetSocketAddress getConnectAddress() {
+ /* Should not be invoked by anyone. Normally used to set token service */
+ throw new NotImplementedException();
+ }
+}
[2/3] TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move
YARNRunner to tez-mapreduce project (bikas)
Posted by bi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
new file mode 100644
index 0000000..d3b2c08
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
@@ -0,0 +1,232 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ResourceMgrDelegate {
+ private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+
+ private YarnConfiguration conf;
+ private GetNewApplicationResponse application;
+ private ApplicationId applicationId;
+ private YarnClient client;
+ private InetSocketAddress rmAddress;
+
+ /**
+ * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
+ * @param conf the configuration object.
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf) {
+ super();
+ this.conf = conf;
+ client = YarnClient.createYarnClient();
+ client.init(conf);
+ this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ client.start();
+ }
+
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarnNodes(client.getNodeReports());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ try {
+ Set<String> appTypes = new HashSet<String>(1);
+ appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
+ return TypeConverter.fromYarnApps(client.getApplications(appTypes),
+ this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ // TODO: Implement getBlacklistedTrackers
+ LOG.warn("getBlacklistedTrackers - Not implemented yet");
+ return new TaskTrackerInfo[0];
+ }
+
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ YarnClusterMetrics metrics;
+ try {
+ metrics = client.getYarnClusterMetrics();
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
+ metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+ metrics.getNumNodeManagers(), 0, 0);
+ return oldMetrics;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Token getDelegationToken(Text renewer) throws IOException,
+ InterruptedException {
+ try {
+ // Remove rmAddress after YARN-868 is addressed
+ return ConverterUtils.convertFromYarn(
+ client.getRMDelegationToken(renewer), rmAddress);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return FileSystem.get(conf).getUri().toString();
+ }
+
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ try {
+ this.application =
+ client.createApplication().getNewApplicationResponse();
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ this.applicationId = this.application.getApplicationId();
+ return TypeConverter.fromYarn(applicationId);
+ }
+
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarn(
+ client.getQueueInfo(queueName), this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueUserAclsInfo(
+ client.getQueueAclsInfo());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
+ this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
+ this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+ String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path path = MRApps.getStagingAreaDir(conf, user);
+ LOG.debug("getStagingAreaDir: dir=" + path);
+ return path.toString();
+ }
+
+
+ public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+ //FileContext.getFileContext(conf).delete(sysDir, true);
+ return sysDir.toString();
+ }
+
+
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return 0;
+ }
+
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ return;
+ }
+
+
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public void killApplication(ApplicationId appId)
+ throws YarnException, IOException {
+ client.killApplication(appId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
new file mode 100644
index 0000000..6b6dd35
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -0,0 +1,722 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
+ */
+@SuppressWarnings({ "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+ private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+ private ResourceMgrDelegate resMgrDelegate;
+ private ClientCache clientCache;
+ private Configuration conf;
+ private final FileContext defaultFileContext;
+
+ final public static FsPermission DAG_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644);
+ final public static int UTF8_CHUNK_SIZE = 16 * 1024;
+
+ private final TezConfiguration tezConf;
+ private final TezClient tezClient;
+ private DAGClient dagClient;
+
+ /**
+ * Yarn runner incapsulates the client interface of
+ * yarn
+ * @param conf the configuration object for the client
+ */
+ public YARNRunner(Configuration conf) {
+ this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+ }
+
+ /**
+ * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+ * {@link ResourceMgrDelegate}. Enables mocking and testing.
+ * @param conf the configuration object for the client
+ * @param resMgrDelegate the resourcemanager client handle.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+ this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+ }
+
+ /**
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+ * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+ * @param conf the configuration object
+ * @param resMgrDelegate the resource manager delegate
+ * @param clientCache the client cache object.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+ ClientCache clientCache) {
+ this.conf = conf;
+ this.tezConf = new TezConfiguration(conf);
+ try {
+ this.resMgrDelegate = resMgrDelegate;
+ this.tezClient = new TezClient(tezConf);
+ this.clientCache = clientCache;
+ this.defaultFileContext = FileContext.getFileContext(this.conf);
+
+ } catch (UnsupportedFileSystemException ufe) {
+ throw new RuntimeException("Error in instantiating YarnClient", ufe);
+ }
+ }
+
+ @VisibleForTesting
+ @Private
+ /**
+ * Used for testing mostly.
+ * @param resMgrDelegate the resource manager delegate to set to.
+ */
+ public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+ this.resMgrDelegate = resMgrDelegate;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getActiveTrackers();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return resMgrDelegate.getAllJobs();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getBlacklistedTrackers();
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getClusterMetrics();
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException, InterruptedException {
+ // The token is only used for serialization. So the type information
+ // mismatch should be fine.
+ return resMgrDelegate.getDelegationToken(renewer);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return resMgrDelegate.getFilesystemName();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return resMgrDelegate.getNewJobID();
+ }
+
+ @Override
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueue(queueName);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getQueues();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getRootQueues();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getChildQueues(parent);
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getStagingAreaDir();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getSystemDir();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getTaskTrackerExpiryInterval();
+ }
+
+ private Map<String, LocalResource> createJobLocalResources(
+ Configuration jobConf, String jobSubmitDir)
+ throws IOException {
+
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+ URL yarnUrlForJobSubmitDir = ConverterUtils
+ .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+ .resolvePath(
+ defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+ LOG.debug("Creating setup context, jobSubmitDir url is "
+ + yarnUrlForJobSubmitDir);
+
+ localResources.put(MRJobConfig.JOB_CONF_FILE,
+ createApplicationResource(defaultFileContext,
+ jobConfPath, LocalResourceType.FILE));
+ if (jobConf.get(MRJobConfig.JAR) != null) {
+ Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+ LocalResource rc = createApplicationResource(defaultFileContext,
+ jobJarPath,
+ LocalResourceType.FILE);
+ // FIXME fix pattern support
+ // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+ // rc.setPattern(pattern);
+ localResources.put(MRJobConfig.JOB_JAR, rc);
+ } else {
+ // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+ // mapreduce jar itself which is already on the classpath.
+ LOG.info("Job jar is not present. "
+ + "Not adding any jar to the list of resources.");
+ }
+
+ // TODO gross hack
+ for (String s : new String[] {
+ MRJobConfig.JOB_SPLIT,
+ MRJobConfig.JOB_SPLIT_METAINFO}) {
+ localResources.put(s,
+ createApplicationResource(defaultFileContext,
+ new Path(jobSubmitDir, s), LocalResourceType.FILE));
+ }
+
+ MRApps.setupDistributedCache(jobConf, localResources);
+
+ return localResources;
+ }
+
+ // FIXME isn't this a nice mess of a client?
+ // read input, write splits, read splits again
+ private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
+ FileSystem fs, Configuration conf,
+ String jobSubmitDir) throws IOException {
+ TaskSplitMetaInfo[] splitsInfo =
+ SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
+ new Path(jobSubmitDir));
+ int splitsCount = splitsInfo.length;
+ List<TaskLocationHint> locationHints =
+ new ArrayList<TaskLocationHint>(splitsCount);
+ for (int i = 0; i < splitsCount; ++i) {
+ TaskLocationHint locationHint =
+ new TaskLocationHint(
+ new HashSet<String>(
+ Arrays.asList(splitsInfo[i].getLocations())), null);
+ locationHints.add(locationHint);
+ }
+ return locationHints;
+ }
+
+ private void setupMapReduceEnv(Configuration jobConf,
+ Map<String, String> environment, boolean isMap) throws IOException {
+
+ if (isMap) {
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
+ "map",
+ MRJobConfig.MAP_JAVA_OPTS,
+ MRJobConfig.MAP_ENV);
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
+ "map",
+ MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ } else {
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
+ "reduce",
+ MRJobConfig.REDUCE_JAVA_OPTS,
+ MRJobConfig.REDUCE_ENV);
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
+ "reduce",
+ MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ }
+
+ MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
+ }
+
+ private Vertex createVertexForStage(Configuration stageConf,
+ Map<String, LocalResource> jobLocalResources,
+ List<TaskLocationHint> locations, int stageNum, int totalStages)
+ throws IOException {
+ // stageNum starts from 0, goes till numStages - 1
+ boolean isMap = false;
+ if (stageNum == 0) {
+ isMap = true;
+ }
+
+ int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
+ : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ String processorName = isMap ? MapProcessor.class.getName()
+ : ReduceProcessor.class.getName();
+ String vertexName = null;
+ if (isMap) {
+ vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+ } else {
+ if (stageNum == totalStages - 1) {
+ vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+ } else {
+ vertexName = MultiStageMRConfigUtil
+ .getIntermediateStageVertexName(stageNum);
+ }
+ }
+
+ Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
+ : MRHelpers.getReduceResource(stageConf);
+ byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+ Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
+ setUserPayload(vertexUserPayload),
+ numTasks, taskResource);
+ if (isMap) {
+ byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
+ MRHelpers.addMRInput(vertex, mapInputPayload, null);
+ }
+ // Map only jobs.
+ if (stageNum == totalStages -1) {
+ MRHelpers.addMROutput(vertex, vertexUserPayload);
+ }
+
+ Map<String, String> taskEnv = new HashMap<String, String>();
+ setupMapReduceEnv(stageConf, taskEnv, isMap);
+
+ Map<String, LocalResource> taskLocalResources =
+ new TreeMap<String, LocalResource>();
+ // PRECOMMIT Remove split localization for reduce tasks if it's being set
+ // here
+ taskLocalResources.putAll(jobLocalResources);
+
+ String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
+ : MRHelpers.getReduceJavaOpts(stageConf);
+
+ vertex.setTaskEnvironment(taskEnv)
+ .setTaskLocalResources(taskLocalResources)
+ .setTaskLocationsHint(locations)
+ .setJavaOpts(taskJavaOpts);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding vertex to DAG" + ", vertexName="
+ + vertex.getVertexName() + ", processor="
+ + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
+ + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
+ + ", resources=" + vertex.getTaskResource()
+ // TODO Add localResources and Environment
+ );
+ }
+
+ return vertex;
+ }
+
+ private DAG createDAG(FileSystem fs, JobID jobId, Configuration[] stageConfs,
+ String jobSubmitDir, Credentials ts,
+ Map<String, LocalResource> jobLocalResources) throws IOException {
+
+ String jobName = stageConfs[0].get(MRJobConfig.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME);
+ DAG dag = new DAG(jobName);
+
+ LOG.info("Number of stages: " + stageConfs.length);
+
+ List<TaskLocationHint> mapInputLocations =
+ getMapLocationHintsFromInputSplits(
+ jobId, fs, stageConfs[0], jobSubmitDir);
+ List<TaskLocationHint> reduceInputLocations = null;
+
+ Vertex[] vertices = new Vertex[stageConfs.length];
+ for (int i = 0; i < stageConfs.length; i++) {
+ vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
+ i == 0 ? mapInputLocations : reduceInputLocations, i,
+ stageConfs.length);
+ }
+
+ for (int i = 0; i < vertices.length; i++) {
+ dag.addVertex(vertices[i]);
+ if (i > 0) {
+ EdgeProperty edgeProperty = new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(OnFileSortedOutput.class.getName()),
+ new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
+
+ Edge edge = null;
+ edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
+ dag.addEdge(edge);
+ }
+
+ }
+ return dag;
+ }
+
+ private TezConfiguration getDAGAMConfFromMRConf() {
+ TezConfiguration finalConf = new TezConfiguration(this.tezConf);
+ Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+ .getMRToDAGParamMap();
+
+ for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+ if (finalConf.get(entry.getKey()) != null) {
+ finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
+ finalConf.unset(entry.getKey());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+ + " to Tez key: " + entry.getValue() + " with value "
+ + finalConf.get(entry.getValue()));
+ }
+ }
+ }
+ return finalConf;
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+
+ ApplicationId appId = resMgrDelegate.getApplicationId();
+
+ FileSystem fs = FileSystem.get(conf);
+ // Loads the job.xml written by the user.
+ JobConf jobConf = new JobConf(new TezConfiguration(conf));
+
+ // Extract individual raw MR configs.
+ Configuration[] stageConfs = MultiStageMRConfToTezTranslator
+ .getStageConfs(jobConf);
+
+ // Transform all confs to use Tez keys
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
+ null);
+ for (int i = 1; i < stageConfs.length; i++) {
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
+ stageConfs[i - 1]);
+ }
+
+ // create inputs to tezClient.submit()
+
+ // FIXME set up job resources
+ Map<String, LocalResource> jobLocalResources =
+ createJobLocalResources(stageConfs[0], jobSubmitDir);
+
+ // FIXME createDAG should take the tezConf as a parameter, instead of using
+ // MR keys.
+ DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
+ jobLocalResources);
+
+ List<String> vargs = new LinkedList<String>();
+ // admin command opts and user command opts
+ String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+ MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ vargs.add(mrAppMasterAdminOptions);
+
+ // Add AM user command opts
+ String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+ MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+ vargs.add(mrAppMasterUserOptions);
+
+ StringBuilder javaOpts = new StringBuilder();
+ for (String varg : vargs) {
+ javaOpts.append(varg).append(" ");
+ }
+
+ // Setup the CLASSPATH in environment
+ // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+
+ // Setup the environment variables for AM
+ MRHelpers.updateEnvironmentForMRAM(conf, environment);
+
+ TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
+ dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
+
+ // Submit to ResourceManager
+ try {
+ dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ jobSubmitDir);
+ AMConfiguration amConfig = new AMConfiguration(
+ jobConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME),
+ environment,
+ jobLocalResources, dagAMConf, ts);
+ tezClient.submitDAGApplication(appId, dag, amConfig);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
+
+ return getJobStatus(jobId);
+ }
+
+ private LocalResource createApplicationResource(FileContext fs, Path p,
+ LocalResourceType type) throws IOException {
+ LocalResource rsrc = Records.newRecord(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ @Override
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ resMgrDelegate.setJobPriority(arg0, arg1);
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return resMgrDelegate.getProtocolVersion(arg0, arg1);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+
+ @Override
+ public Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0).getJobCounters(arg0);
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobID) throws IOException,
+ InterruptedException {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ String jobFile = MRApps.getJobFile(conf, user, jobID);
+ DAGStatus dagStatus;
+ try {
+ if(dagClient == null) {
+ dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
+ }
+ dagStatus = dagClient.getDAGStatus();
+ return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+ int arg2) throws IOException, InterruptedException {
+ return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+ throws IOException, InterruptedException {
+ return clientCache.getClient(jobID)
+ .getTaskReports(jobID, taskType);
+ }
+
+ @Override
+ public void killJob(JobID arg0) throws IOException, InterruptedException {
+ /* check if the status is not running, if not send kill to RM */
+ JobStatus status = getJobStatus(arg0);
+ if (status.getState() == JobStatus.State.RUNNING ||
+ status.getState() == JobStatus.State.PREP) {
+ try {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ return;
+ }
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String arg0) throws IOException {
+ return new AccessControlList("*");
+ }
+
+ @Override
+ public JobTrackerStatus getJobTrackerStatus() throws IOException,
+ InterruptedException {
+ return JobTrackerStatus.RUNNING;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+ clientMethodsHash);
+ }
+
+ @Override
+ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException {
+ try {
+ return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static void warnForJavaLibPath(String opts, String component,
+ String javaConf, String envConf) {
+ if (opts != null && opts.contains("-Djava.library.path")) {
+ LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+ "programs to no longer function if hadoop native libraries " +
+ "are used. These values should be set as part of the " +
+ "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+ envConf + " config settings.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
new file mode 100644
index 0000000..c36dc9d
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+ return new YARNRunner(conf);
+ }
+ return null;
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+ throws IOException {
+ return create(conf);
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ // nothing to do
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..88816ca
--- /dev/null
+++ b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
new file mode 100644
index 0000000..b3c247a
--- /dev/null
+++ b/tez-tests/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-tests</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce-examples</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
new file mode 100644
index 0000000..f98f392
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -0,0 +1,359 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobs {
+
+ private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
+
+ protected static MiniTezCluster mrrTezCluster;
+ protected static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target"
+ + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
+
+ private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
+ TestMRRJobs.class.getSimpleName();
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrrTezCluster == null) {
+ mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
+ 1, 1);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+ conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test (timeout = 60000)
+ public void testMRRSleepJob() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ LOG.info("\n\n\nStarting testMRRSleepJob().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO use dag client to test counters and task progress?
+ // what about completed jobs?
+ }
+
+ @Test (timeout = 60000)
+ public void testRandomWriter() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testRandomWriter().");
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+ mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+ mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+ Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
+ Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setSpeculativeExecution(false);
+ job.setJarByClass(RandomTextWriterJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+ // Make sure there are three files in the output-dir
+
+ RemoteIterator<FileStatus> iterator =
+ FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
+ outputDir);
+ int count = 0;
+ while (iterator.hasNext()) {
+ FileStatus file = iterator.next();
+ if (!file.getPath().getName()
+ .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+ count++;
+ }
+ }
+ Assert.assertEquals("Number of part files is wrong!", 3, count);
+
+ }
+
+
+ @Test (timeout = 60000)
+ public void testFailingJob() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testFailingJob().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
+ job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
+
+ job.submit();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertFalse(succeeded);
+ Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO verify failed task diagnostics
+ }
+
+ @Test (timeout = 60000)
+ public void testFailingAttempt() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testFailingAttempt().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(3); // speed up failures
+ job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
+ job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
+
+ job.submit();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO verify failed task diagnostics
+ }
+
+ @Test (timeout = 60000)
+ public void testMRRSleepJobWithCompression() throws IOException,
+ InterruptedException, ClassNotFoundException {
+ LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 2, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+
+ // enable compression
+ job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+ job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+ DefaultCodec.class.getName());
+
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO use dag client to test counters and task progress?
+ // what about completed jobs?
+
+ }
+
+
+ /*
+ //@Test (timeout = 60000)
+ public void testMRRSleepJobWithSecurityOn() throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
+
+ if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+ return;
+ }
+
+ mrrTezCluster.getConfig().set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
+ mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
+ mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
+ "rm/sightbusy-lx@LOCALHOST");
+ mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
+ "nm/sightbusy-lx@LOCALHOST");
+
+ UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
+
+ // Keep it in here instead of after RM/NM as multiple user logins happen in
+ // the same JVM.
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+ LOG.info("User name is " + user.getUserName());
+ for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+ LOG.info("Token is " + str.encodeToUrlString());
+ }
+ user.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(mrrTezCluster.getConfig());
+ Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+ // //Job with reduces
+ // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ job.waitForCompletion(true);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+ return null;
+ }
+ });
+
+ // TODO later: add explicit "isUber()" checks of some sort
+ }
+ */
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
new file mode 100644
index 0000000..1bc7b4d
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobsDAGApi {
+
+ private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
+
+ protected static MiniTezCluster mrrTezCluster;
+ protected static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (mrrTezCluster == null) {
+ mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(),
+ 1, 1, 1);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ // TODO Add cleanup code.
+ }
+
+ // Submits a simple 5 stage sleep job using the DAG submit API instead of job
+ // client.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmit() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
+
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmitAndKill() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
+
+ Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Submits a DAG to AM via RPC after AM has started
+ @Test(timeout = 60000)
+ public void testMRRSleepJobViaSession() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
+
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ }
+
+ // Submits a DAG to AM via RPC after AM has started
+ @Test(timeout = 120000)
+ public void testMultipleMRRSleepJobViaSession() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ Map<String, String> commonEnv = createCommonEnv();
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+ .valueOf(new Random().nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ TezConfiguration tezConf = new TezConfiguration(
+ mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+
+ Map<String, LocalResource> amLocalResources =
+ new HashMap<String, LocalResource>();
+
+ AMConfiguration amConfig = new AMConfiguration(
+ "default", commonEnv, amLocalResources,
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ TezSession tezSession = new TezSession("testsession", tezSessionConfig);
+ tezSession.start();
+ Assert.assertEquals(TezSessionStatus.INITIALIZING,
+ tezSession.getSessionStatus());
+
+ State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession, false);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+ finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession, false);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+
+ ApplicationId appId = tezSession.getApplicationId();
+ tezSession.stop();
+ Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+ tezSession.getSessionStatus());
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FAILED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.KILLED)) {
+ break;
+ }
+ }
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appReport.getYarnApplicationState());
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+ appReport.getFinalApplicationStatus());
+ }
+
+ // Submits a simple 5 stage sleep job using tez session. Then kills it.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
+
+ Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Create and close a tez session without submitting a job
+ @Test(timeout = 60000)
+ public void testTezSessionShutdown() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ testMRRSleepJobDagSubmitCore(true, false, true, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testAMSplitGeneration() throws IOException, InterruptedException,
+ TezException, ClassNotFoundException, YarnException {
+ testMRRSleepJobDagSubmitCore(true, false, false, true);
+ }
+
+ public State testMRRSleepJobDagSubmitCore(
+ boolean dagViaRPC,
+ boolean killDagWhileRunning,
+ boolean closeSessionBeforeSubmit,
+ boolean genSplitsInAM) throws IOException,
+ InterruptedException, TezException, ClassNotFoundException,
+ YarnException {
+ return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
+ closeSessionBeforeSubmit, null, genSplitsInAM);
+ }
+
+ private Map<String, String> createCommonEnv() {
+ Map<String, String> commonEnv = new HashMap<String, String>();
+ return commonEnv;
+ }
+
+ public State testMRRSleepJobDagSubmitCore(
+ boolean dagViaRPC,
+ boolean killDagWhileRunning,
+ boolean closeSessionBeforeSubmit,
+ TezSession reUseTezSession,
+ boolean genSplitsInAM) throws IOException,
+ InterruptedException, TezException, ClassNotFoundException,
+ YarnException {
+ LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
+
+ JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
+ JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
+ JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
+
+ stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
+ stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
+ stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
+ stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+ stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ SleepInputFormat.class.getName());
+ stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ MRRSleepJobPartitioner.class.getName());
+
+ stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+ stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+ stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ stage2Conf
+ .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+ stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ MRRSleepJobPartitioner.class.getName());
+
+ JobConf stage22Conf = new JobConf(stage2Conf);
+ stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
+
+ stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+ stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+ stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+ stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+ NullOutputFormat.class.getName());
+
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
+ stage1Conf);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage22Conf,
+ stage1Conf);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
+ stage2Conf); // this also works stage22 as it sets up keys etc
+
+ MRHelpers.doJobClientMagic(stage1Conf);
+ MRHelpers.doJobClientMagic(stage2Conf);
+ MRHelpers.doJobClientMagic(stage22Conf);
+ MRHelpers.doJobClientMagic(stage3Conf);
+
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+ InputSplitInfo inputSplitInfo = null;
+ if (!genSplitsInAM) {
+ inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
+ remoteStagingDir);
+ }
+
+ byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
+ byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
+
+ DAG dag = new DAG("testMRRSleepJobDagSubmit");
+ int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
+ Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+ : null;
+ Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
+ MapProcessor.class.getName()).setUserPayload(stage1Payload),
+ stage1NumTasks, Resource.newInstance(256, 1));
+ MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
+ Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
+ ReduceProcessor.class.getName()).setUserPayload(
+ MRHelpers.createUserPayloadFromConf(stage2Conf)),
+ 1, Resource.newInstance(256, 1));
+ Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
+ ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
+ 1, Resource.newInstance(256, 1));
+ MRHelpers.addMROutput(stage3Vertex, stage3Payload);
+
+ Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+ Map<String, String> commonEnv = createCommonEnv();
+
+ if (!genSplitsInAM) {
+ // TODO Use utility method post TEZ-205.
+ Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+ stage1LocalResources.put(
+ inputSplitInfo.getSplitsFile().getName(),
+ createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ stage1LocalResources.put(
+ inputSplitInfo.getSplitsMetaInfoFile().getName(),
+ createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ stage1LocalResources.putAll(commonLocalResources);
+
+ stage1Vertex.setTaskLocalResources(stage1LocalResources);
+ stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ } else {
+ stage1Vertex.setTaskLocalResources(commonLocalResources);
+ }
+
+ stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+ stage1Vertex.setTaskEnvironment(commonEnv);
+
+ // TODO env, resources
+
+ stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
+ stage2Vertex.setTaskLocalResources(commonLocalResources);
+ stage2Vertex.setTaskEnvironment(commonEnv);
+
+ stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
+ stage3Vertex.setTaskLocalResources(commonLocalResources);
+ stage3Vertex.setTaskEnvironment(commonEnv);
+
+ dag.addVertex(stage1Vertex);
+ dag.addVertex(stage2Vertex);
+ dag.addVertex(stage3Vertex);
+
+ Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, new OutputDescriptor(
+ OnFileSortedOutput.class.getName()), new InputDescriptor(
+ ShuffledMergedInputLegacy.class.getName())));
+ Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, new OutputDescriptor(
+ OnFileSortedOutput.class.getName()), new InputDescriptor(
+ ShuffledMergedInputLegacy.class.getName())));
+
+ dag.addEdge(edge1);
+ dag.addEdge(edge2);
+
+ Map<String, LocalResource> amLocalResources =
+ new HashMap<String, LocalResource>();
+ amLocalResources.putAll(commonLocalResources);
+
+ TezConfiguration tezConf = new TezConfiguration(
+ mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+
+ TezClient tezClient = new TezClient(tezConf);
+ DAGClient dagClient = null;
+ TezSession tezSession = null;
+ boolean reuseSession = reUseTezSession != null;
+ TezSessionConfiguration tezSessionConfig;
+ AMConfiguration amConfig = new AMConfiguration(
+ "default", commonEnv, amLocalResources,
+ tezConf, null);
+ if(!dagViaRPC) {
+ // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
+ dagClient = tezClient.submitDAGApplication(dag, amConfig);
+ } else {
+ if (reuseSession) {
+ tezSession = reUseTezSession;
+ } else {
+ tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("testsession", tezSessionConfig);
+ tezSession.start();
+ }
+ }
+
+ if (dagViaRPC && closeSessionBeforeSubmit) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+ boolean sentKillSession = false;
+ while(true) {
+ Thread.sleep(500l);
+ ApplicationReport appReport =
+ yarnClient.getApplicationReport(tezSession.getApplicationId());
+ if (appReport == null) {
+ continue;
+ }
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (!sentKillSession) {
+ if (appState == YarnApplicationState.RUNNING) {
+ tezSession.stop();
+ sentKillSession = true;
+ }
+ } else {
+ if (appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED
+ || appState == YarnApplicationState.FAILED) {
+ LOG.info("Application completed after sending session shutdown"
+ + ", yarnApplicationState=" + appState
+ + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appState);
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+ appReport.getFinalApplicationStatus());
+ break;
+ }
+ }
+ }
+ yarnClient.stop();
+ return null;
+ }
+
+ if(dagViaRPC) {
+ LOG.info("Submitting dag to tez session with appId="
+ + tezSession.getApplicationId());
+ dagClient = tezSession.submitDAG(dag);
+ Assert.assertEquals(TezSessionStatus.RUNNING,
+ tezSession.getSessionStatus());
+ }
+ DAGStatus dagStatus = dagClient.getDAGStatus();
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms."
+ + " Current state: " + dagStatus.getState());
+ Thread.sleep(500l);
+ if(killDagWhileRunning
+ && dagStatus.getState() == DAGStatus.State.RUNNING) {
+ LOG.info("Killing running dag/session");
+ if (dagViaRPC) {
+ tezSession.stop();
+ } else {
+ dagClient.tryKillDAG();
+ }
+ }
+ dagStatus = dagClient.getDAGStatus();
+ }
+ if (dagViaRPC && !reuseSession) {
+ tezSession.stop();
+ }
+ return dagStatus.getState();
+ }
+
+ private static LocalResource createLocalResource(FileSystem fc, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility)
+ throws IOException {
+ FileStatus fstat = fc.getFileStatus(file);
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+ .getPath()));
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ return LocalResource.newInstance(resourceURL, type, visibility,
+ resourceSize, resourceModificationTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
new file mode 100644
index 0000000..8926ed8
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -0,0 +1,172 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezCluster extends MiniYARNCluster {
+
+ public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+ private static final Log LOG = LogFactory.getLog(MiniTezCluster.class);
+
+ private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+ private Path confFilePath;
+
+ public MiniTezCluster(String testName) {
+ this(testName, 1);
+ }
+
+ public MiniTezCluster(String testName, int noOfNMs) {
+ super(testName, noOfNMs, 4, 4);
+ }
+
+ public MiniTezCluster(String testName, int noOfNMs,
+ int numLocalDirs, int numLogDirs) {
+ super(testName, noOfNMs, numLocalDirs, numLogDirs);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+ if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+ "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+ }
+
+ if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+ // nothing defined. set quick delete value
+ conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+ }
+
+ File appJarFile = new File(MiniTezCluster.APPJAR);
+
+ if (!appJarFile.exists()) {
+ String message = "TezAppJar " + MiniTezCluster.APPJAR
+ + " not found. Exiting.";
+ LOG.info(message);
+ throw new TezUncheckedException(message);
+ } else {
+ conf.set(TezConfiguration.TEZ_LIB_URIS, "file://" + appJarFile.getAbsolutePath());
+ LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+ }
+
+ // VMEM monitoring disabled, PMEM monitoring enabled.
+ conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+ conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+
+ try {
+ Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+ FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+ if (fc.util().exists(stagingPath)) {
+ LOG.info(stagingPath + " exists! deleting...");
+ fc.delete(stagingPath, true);
+ }
+ LOG.info("mkdir: " + stagingPath);
+ fc.mkdir(stagingPath, null, true);
+
+ //mkdir done directory as well
+ String doneDir =
+ JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ Path doneDirPath = fc.makeQualified(new Path(doneDir));
+ fc.mkdir(doneDirPath, null, true);
+ } catch (IOException e) {
+ throw new TezUncheckedException("Could not create staging directory. ", e);
+ }
+ conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+ //configure the shuffle service in NM
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+ new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+ Service.class);
+
+ // Non-standard shuffle port
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+ conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+ DefaultContainerExecutor.class, ContainerExecutor.class);
+
+ // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+ // for corresponding uberized tests.
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ super.serviceStart();
+ File workDir = super.getTestWorkDir();
+ Configuration conf = super.getConfig();
+
+ confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+ File confFile = new File(confFilePath.toString());
+ try {
+ confFile.createNewFile();
+ conf.writeXml(new FileOutputStream(confFile));
+ confFile.deleteOnExit();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ confFilePath = new Path(confFile.getAbsolutePath());
+ conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+ LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+ + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+ }
+
+ public Path getConfigFilePath() {
+ return confFilePath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/findbugs-exclude.xml b/tez-yarn-client/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-yarn-client/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>