You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/10/26 06:31:28 UTC
svn commit: r1189023 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/ap...
Author: mahadev
Date: Wed Oct 26 04:31:28 2011
New Revision: 1189023
URL: http://svn.apache.org/viewvc?rev=1189023&view=rev
Log:
MAPREDUCE-3250. When AM restarts, client keeps reconnecting to the new AM and prints a lots of logs. (vinodkv via mahadev)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Oct 26 04:31:28 2011
@@ -1776,6 +1776,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2977. Fix ResourceManager to renew HDFS delegation tokens for
applications. (acmurthy)
+ MAPREDUCE-3250. When AM restarts, client keeps reconnecting to the new AM
+ and prints a lots of logs. (vinodkv via mahadev)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Wed Oct 26 04:31:28 2011
@@ -81,7 +81,6 @@ public class ClientServiceDelegate {
private final ApplicationId appId;
private final ResourceMgrDelegate rm;
private final MRClientProtocol historyServerProxy;
- private boolean forceRefresh;
private MRClientProtocol realProxy = null;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User";
@@ -122,7 +121,7 @@ public class ClientServiceDelegate {
}
private MRClientProtocol getProxy() throws YarnRemoteException {
- if (!forceRefresh && realProxy != null) {
+ if (realProxy != null) {
return realProxy;
}
@@ -133,7 +132,9 @@ public class ClientServiceDelegate {
trackingUrl = application.getTrackingUrl();
}
String serviceAddr = null;
- while (application == null || YarnApplicationState.RUNNING == application.getYarnApplicationState()) {
+ while (application == null
+ || YarnApplicationState.RUNNING == application
+ .getYarnApplicationState()) {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
@@ -163,7 +164,7 @@ public class ClientServiceDelegate {
}
LOG.info("Tracking Url of JOB is " + application.getTrackingUrl());
LOG.info("Connecting to " + serviceAddr);
- instantiateAMProxy(serviceAddr);
+ realProxy = instantiateAMProxy(serviceAddr);
return realProxy;
} catch (IOException e) {
//possibly the AM has crashed
@@ -233,10 +234,12 @@ public class ClientServiceDelegate {
return historyServerProxy;
}
- private void instantiateAMProxy(final String serviceAddr) throws IOException {
+ MRClientProtocol instantiateAMProxy(final String serviceAddr)
+ throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
- realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+ MRClientProtocol proxy = currentUser
+ .doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
YarnRPC rpc = YarnRPC.create(conf);
@@ -245,6 +248,7 @@ public class ClientServiceDelegate {
}
});
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+ return proxy;
}
private synchronized Object invoke(String method, Class argClass,
@@ -274,12 +278,14 @@ public class ClientServiceDelegate {
" retrying..");
LOG.debug("Failed exception on AM/History contact",
e.getTargetException());
- forceRefresh = true;
+ // Force reconnection by setting the proxy to null.
+ realProxy = null;
} catch (Exception e) {
LOG.info("Failed to contact AM/History for job " + jobId
+ " Will retry..");
LOG.debug("Failing to contact application master", e);
- forceRefresh = true;
+ // Force reconnection by setting the proxy to null.
+ realProxy = null;
}
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Wed Oct 26 04:31:28 2011
@@ -68,8 +68,6 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -84,6 +82,8 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -123,20 +123,24 @@ public class TestClientRedirect {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
+
+ // Start the RM.
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
+ // Start the AM.
AMService amService = new AMService();
amService.init(conf);
amService.start(conf);
- amRunning = true;
+ // Start the HS.
HistoryService historyService = new HistoryService();
historyService.init(conf);
historyService.start(conf);
LOG.info("services started");
+
Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1);
@@ -151,13 +155,13 @@ public class TestClientRedirect {
//bring down the AM service
amService.stop();
- amRunning = false;
LOG.info("Sleeping for 5 seconds after stop for" +
" the server to exit cleanly..");
Thread.sleep(5000);
amRestarting = true;
+
// Same client
//results are returned from fake (not started job)
counters = cluster.getJob(jobID).getCounters();
@@ -181,14 +185,15 @@ public class TestClientRedirect {
amService = new AMService();
amService.init(conf);
amService.start(conf);
- amRunning = true;
amContact = false; //reset
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
- amRunning = false;
+ // Stop the AM. It is not even restarting. So it should be treated as
+ // completed.
+ amService.stop();
// Same client
counters = cluster.getJob(jobID).getCounters();
@@ -347,6 +352,7 @@ public class TestClientRedirect {
private InetSocketAddress bindAddress;
private Server server;
private final String hostAddress;
+
public AMService() {
this(AMHOSTADDRESS);
}
@@ -376,11 +382,13 @@ public class TestClientRedirect {
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+ ":" + server.getPort());
super.start();
+ amRunning = true;
}
public void stop() {
server.stop();
super.stop();
+ amRunning = false;
}
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Wed Oct 26 04:31:28 2011
@@ -1,208 +1,272 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Test;
-
-/**
- * Tests for ClientServiceDelegate.java
- */
-
-public class TestClientServiceDelegate {
- private JobID oldJobId = JobID.forName("job_1315895242400_2");
- private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
- .toYarn(oldJobId);
-
- @Test
- public void testUnknownAppInRM() throws Exception {
- MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
- when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
- getJobReportResponse());
- ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
- historyServerProxy, getRMDelegate());
-
- JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
- Assert.assertNotNull(jobStatus);
- }
-
- @Test
- public void testRemoteExceptionFromHistoryServer() throws Exception {
-
- MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
- when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
- RPCUtil.getRemoteException("Job ID doesnot Exist"));
-
- ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
- when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
- .thenReturn(null);
-
- ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
- historyServerProxy, rm);
-
- try {
- clientServiceDelegate.getJobStatus(oldJobId);
- Assert.fail("Invoke should throw exception after retries.");
- } catch (YarnRemoteException e) {
- Assert.assertEquals("Job ID doesnot Exist", e.getMessage());
- }
- }
-
- @Test
- public void testRetriesOnConnectionFailure() throws Exception {
-
- MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
- when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
- new RuntimeException("1")).thenThrow(new RuntimeException("2"))
- .thenThrow(new RuntimeException("3"))
- .thenReturn(getJobReportResponse());
-
- ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
- when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
- .thenReturn(null);
-
- ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
- historyServerProxy, rm);
-
- JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
- Assert.assertNotNull(jobStatus);
- }
-
- @Test
- public void testHistoryServerNotConfigured() throws Exception {
- //RM doesn't have app report and job History Server is not configured
- ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
- null, getRMDelegate());
- JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
- Assert.assertEquals("N/A", jobStatus.getUsername());
- Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
-
- //RM has app report and job History Server is not configured
- ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
- ApplicationReport applicationReport = getApplicationReport();
- when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
- applicationReport);
-
- clientServiceDelegate = getClientServiceDelegate(null, rm);
- jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
- Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
- Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
- }
-
-
- @Test
- public void testJobReportFromHistoryServer() throws Exception {
- MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
- when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
- getJobReportResponseFromHistoryServer());
- ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
- when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
- .thenReturn(null);
- ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
- historyServerProxy, rm);
-
- JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
- Assert.assertNotNull(jobStatus);
- Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());
- Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());
- Assert.assertEquals(1.0f, jobStatus.getMapProgress());
- Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
- }
-
- private GetJobReportRequest getJobReportRequest() {
- GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
- request.setJobId(jobId);
- return request;
- }
-
- private GetJobReportResponse getJobReportResponse() {
- GetJobReportResponse jobReportResponse = Records
- .newRecord(GetJobReportResponse.class);
- JobReport jobReport = Records.newRecord(JobReport.class);
- jobReport.setJobId(jobId);
- jobReport.setJobState(JobState.SUCCEEDED);
- jobReportResponse.setJobReport(jobReport);
- return jobReportResponse;
- }
-
- private ApplicationReport getApplicationReport() {
- ApplicationReport applicationReport = Records
- .newRecord(ApplicationReport.class);
- applicationReport.setYarnApplicationState(YarnApplicationState.FINISHED);
- applicationReport.setUser("root");
- applicationReport.setHost("N/A");
- applicationReport.setName("N/A");
- applicationReport.setQueue("N/A");
- applicationReport.setStartTime(0);
- applicationReport.setFinishTime(0);
- applicationReport.setTrackingUrl("N/A");
- applicationReport.setDiagnostics("N/A");
- applicationReport.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
- return applicationReport;
- }
-
- private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
- ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
- when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
- return rm;
- }
-
- private ClientServiceDelegate getClientServiceDelegate(
- MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
- Configuration conf = new YarnConfiguration();
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
- ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
- conf, rm, oldJobId, historyServerProxy);
- return clientServiceDelegate;
- }
-
- private GetJobReportResponse getJobReportResponseFromHistoryServer() {
- GetJobReportResponse jobReportResponse = Records
- .newRecord(GetJobReportResponse.class);
- JobReport jobReport = Records.newRecord(JobReport.class);
- jobReport.setJobId(jobId);
- jobReport.setJobState(JobState.SUCCEEDED);
- jobReport.setMapProgress(1.0f);
- jobReport.setReduceProgress(1.0f);
- jobReport.setJobFile("TestJobFilePath");
- jobReport.setTrackingUrl("TestTrackingUrl");
- jobReportResponse.setJobReport(jobReport);
- return jobReportResponse;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Test;
+
+/**
+ * Tests for ClientServiceDelegate.java
+ */
+
+public class TestClientServiceDelegate {
+ private JobID oldJobId = JobID.forName("job_1315895242400_2");
+ private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
+ .toYarn(oldJobId);
+
+ @Test
+ public void testUnknownAppInRM() throws Exception {
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
+ getJobReportResponse());
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+ historyServerProxy, getRMDelegate());
+
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertNotNull(jobStatus);
+ }
+
+ @Test
+ public void testRemoteExceptionFromHistoryServer() throws Exception {
+
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
+ RPCUtil.getRemoteException("Job ID doesnot Exist"));
+
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+ .thenReturn(null);
+
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+ historyServerProxy, rm);
+
+ try {
+ clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.fail("Invoke should throw exception after retries.");
+ } catch (YarnRemoteException e) {
+ Assert.assertEquals("Job ID doesnot Exist", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRetriesOnConnectionFailure() throws Exception {
+
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
+ new RuntimeException("1")).thenThrow(new RuntimeException("2"))
+ .thenThrow(new RuntimeException("3"))
+ .thenReturn(getJobReportResponse());
+
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+ .thenReturn(null);
+
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+ historyServerProxy, rm);
+
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertNotNull(jobStatus);
+ verify(historyServerProxy, times(4)).getJobReport(
+ any(GetJobReportRequest.class));
+ }
+
+ @Test
+ public void testHistoryServerNotConfigured() throws Exception {
+ //RM doesn't have app report and job History Server is not configured
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+ null, getRMDelegate());
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertEquals("N/A", jobStatus.getUsername());
+ Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
+
+ //RM has app report and job History Server is not configured
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ ApplicationReport applicationReport = getFinishedApplicationReport();
+ when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
+ applicationReport);
+
+ clientServiceDelegate = getClientServiceDelegate(null, rm);
+ jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
+ }
+
+ @Test
+ public void testJobReportFromHistoryServer() throws Exception {
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
+ getJobReportResponseFromHistoryServer());
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+ .thenReturn(null);
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+ historyServerProxy, rm);
+
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertNotNull(jobStatus);
+ Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());
+ Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());
+ Assert.assertEquals(1.0f, jobStatus.getMapProgress());
+ Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
+ }
+
+ @Test
+ public void testReconnectOnAMRestart() throws IOException {
+
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+
+ // RM returns AM1 url, null, null and AM2 url on invocations.
+ // Nulls simulate the time when AM2 is in the process of restarting.
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
+ getRunningApplicationReport("am1", 78)).thenReturn(
+ getRunningApplicationReport(null, 0)).thenReturn(
+ getRunningApplicationReport(null, 0)).thenReturn(
+ getRunningApplicationReport("am2", 90));
+
+ GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
+ when(jobReportResponse1.getJobReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
+ JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
+
+ // First AM returns a report with jobName firstGen and simulates AM shutdown
+ // on second invocation.
+ MRClientProtocol firstGenAMProxy = mock(MRClientProtocol.class);
+ when(firstGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
+ .thenReturn(jobReportResponse1).thenThrow(
+ new RuntimeException("AM is down!"));
+
+ GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
+ when(jobReportResponse2.getJobReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
+ JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
+
+ // Second AM generation returns a report with jobName secondGen
+ MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
+ when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
+ .thenReturn(jobReportResponse2);
+
+ ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
+ historyServerProxy, rmDelegate));
+ // First time, connection should be to AM1, then to AM2. Further requests
+ // should use the same proxy to AM2 and so instantiateProxy shouldn't be
+ // called.
+ doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
+ clientServiceDelegate).instantiateAMProxy(any(String.class));
+
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertNotNull(jobStatus);
+ Assert.assertEquals("jobName-firstGen", jobStatus.getJobName());
+
+ jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertNotNull(jobStatus);
+ Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
+
+ jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.assertNotNull(jobStatus);
+ Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
+
+ verify(clientServiceDelegate, times(2)).instantiateAMProxy(
+ any(String.class));
+ }
+
+ private GetJobReportRequest getJobReportRequest() {
+ GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
+ request.setJobId(jobId);
+ return request;
+ }
+
+ private GetJobReportResponse getJobReportResponse() {
+ GetJobReportResponse jobReportResponse = Records
+ .newRecord(GetJobReportResponse.class);
+ JobReport jobReport = Records.newRecord(JobReport.class);
+ jobReport.setJobId(jobId);
+ jobReport.setJobState(JobState.SUCCEEDED);
+ jobReportResponse.setJobReport(jobReport);
+ return jobReportResponse;
+ }
+
+ private ApplicationReport getFinishedApplicationReport() {
+ return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
+ 1234, 5), "user", "queue", "appname", "host", 124, null,
+ YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
+ FinalApplicationStatus.SUCCEEDED, null);
+ }
+
+ private ApplicationReport getRunningApplicationReport(String host, int port) {
+ return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
+ 1234, 5), "user", "queue", "appname", host, port, null,
+ YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
+ FinalApplicationStatus.UNDEFINED, null);
+ }
+
+ private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+ when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
+ return rm;
+ }
+
+ private ClientServiceDelegate getClientServiceDelegate(
+ MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
+ Configuration conf = new YarnConfiguration();
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rm, oldJobId, historyServerProxy);
+ return clientServiceDelegate;
+ }
+
+ private GetJobReportResponse getJobReportResponseFromHistoryServer() {
+ GetJobReportResponse jobReportResponse = Records
+ .newRecord(GetJobReportResponse.class);
+ JobReport jobReport = Records.newRecord(JobReport.class);
+ jobReport.setJobId(jobId);
+ jobReport.setJobState(JobState.SUCCEEDED);
+ jobReport.setMapProgress(1.0f);
+ jobReport.setReduceProgress(1.0f);
+ jobReport.setJobFile("TestJobFilePath");
+ jobReport.setTrackingUrl("TestTrackingUrl");
+ jobReportResponse.setJobReport(jobReport);
+ return jobReportResponse;
+ }
+}