You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/10/25 22:18:28 UTC
samza git commit: SAMZA-1470: Wrong job status returned by YarnRestJobStatusProvider wh…
Repository: samza
Updated Branches:
refs/heads/master 5f81b8d13 -> 85184d05b
SAMZA-1470: Wrong job status returned by YarnRestJobStatusProvider wh…
…en there are multiple app
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Jagadish <jv...@linkedin.com>
Closes #339 from jmakes/samza-1470-4
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/85184d05
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/85184d05
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/85184d05
Branch: refs/heads/master
Commit: 85184d05b99a698c08b120914060a3240ea03d2b
Parents: 5f81b8d
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Oct 25 15:18:11 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Oct 25 15:18:11 2017 -0700
----------------------------------------------------------------------
.../rest/model/yarn/YarnApplicationInfo.java | 12 ++-
.../proxy/job/YarnRestJobStatusProvider.java | 43 ++++++-----
.../job/TestYarnRestJobStatusProvider.java | 77 ++++++++++++++++++++
3 files changed, 107 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
index 1c7f757..8d55c89 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.rest.model.yarn;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.rest.proxy.job.JobInstance;
@@ -40,14 +41,11 @@ public class YarnApplicationInfo {
}
/**
- * Returns a Map with all the apps and their names as the key.
+ *
+ * @return the full list of Yarn applications. There will likely be more than one per job-instance.
*/
- public Map<String, YarnApplication> getApplications() {
- Map<String, YarnApplication> applications = new HashMap<>();
- for (YarnApplication app: this.apps) {
- applications.put(app.getName(), app);
- }
- return applications;
+ public List<YarnApplication> getYarnApplications() {
+ return Collections.unmodifiableList(apps);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
index 63a1ae4..a28e4b2 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
@@ -29,9 +29,10 @@ import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.samza.SamzaException;
-import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
import org.apache.samza.rest.model.Job;
import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
+import org.apache.samza.rest.model.yarn.YarnApplicationInfo.YarnApplication;
import org.apache.samza.rest.resources.JobsResourceConfig;
import org.apache.samza.rest.resources.YarnJobResourceConfig;
import org.codehaus.jackson.map.DeserializationConfig;
@@ -39,7 +40,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* An implementation of the {@link JobStatusProvider} that retrieves
* the job status from the YARN REST api.
@@ -52,12 +52,12 @@ public class YarnRestJobStatusProvider implements JobStatusProvider {
private final String apiEndpoint;
private final HttpClient httpClient;
- public YarnRestJobStatusProvider(JobsResourceConfig config) {
+ YarnRestJobStatusProvider(JobsResourceConfig config) {
YarnJobResourceConfig yarnConfig = new YarnJobResourceConfig(config);
+
this.httpClient = new HttpClient();
+ this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps", yarnConfig.getYarnResourceManagerEndpoint());
OBJECT_MAPPER.configure(DeserializationConfig.Feature.UNWRAP_ROOT_VALUE, true);
- this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps",
- yarnConfig.getYarnResourceManagerEndpoint());
}
@Override
@@ -66,21 +66,28 @@ public class YarnRestJobStatusProvider implements JobStatusProvider {
if (jobs == null || jobs.isEmpty()) {
return;
}
+
+ // We will identify the YARN application states by their qualified names, so build a map
+ // to translate back from that name to the JobInfo we wish to populate.
+ final Map<String, Job> qualifiedJobToInfo = new HashMap<>();
+ for(Job job : jobs) {
+ qualifiedJobToInfo.put(YarnApplicationInfo.getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId())), job);
+ }
+
try {
byte[] response = httpGet(apiEndpoint);
YarnApplicationInfo yarnApplicationInfo = OBJECT_MAPPER.readValue(response, YarnApplicationInfo.class);
- Map<String, YarnApplicationInfo.YarnApplication> yarnApplications = yarnApplicationInfo.getApplications();
- for (Job job: jobs) {
- String qualifiedJobName = YarnApplicationInfo.getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId()));
- YarnApplicationInfo.YarnApplication yarnApp = yarnApplications.get(qualifiedJobName);
- if (yarnApp == null) {
- job.setStatusDetail(JobStatus.UNKNOWN.toString());
- job.setStatus(JobStatus.UNKNOWN);
- continue;
- }
- JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnApp.getState().toUpperCase()));
- if (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED) {
- job.setStatusDetail(yarnApp.getState());
+
+ // There can be multiple Yarn apps for each qualified job name, so we iterate the former and match with latter.
+ for (YarnApplication app: yarnApplicationInfo.getYarnApplications()) {
+ Job job = qualifiedJobToInfo.get(app.getName());
+ JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(app.getState().toUpperCase()));
+
+ // If job is null, it wasn't requested. The default statusDetail is null so always update in that case.
+ // Only update the job status if the current status is not STOPPED because there could be many
+ // application attempts for the job, and we're interested in the RUNNING one if it exists.
+ if (job != null && (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED)) {
+ job.setStatusDetail(app.getState());
job.setStatus(samzaStatus);
}
}
@@ -126,7 +133,7 @@ public class YarnRestJobStatusProvider implements JobStatusProvider {
* @return the response
* @throws IOException if there are problems with the http get request.
*/
- private byte[] httpGet(String requestUrl)
+ byte[] httpGet(String requestUrl)
throws IOException {
GetMethod getMethod = new GetMethod(requestUrl);
try {
http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java b/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java
new file mode 100644
index 0000000..0875a6f
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestYarnRestJobStatusProvider extends TestCase {
+ private YarnRestJobStatusProvider provider;
+
+ private static final String APPS_RESPONSE =
+ "{\"apps\":{\"app\":[{\"id\":\"application_1502919535296_0161\",\"name\":\"job1_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"},"
+ + "{\"id\":\"application_1502919535296_0163\",\"name\":\"job1_1\",\"state\":\"RUNNING\",\"finalStatus\":\"UNDEFINED\",\"applicationType\":\"Samza\"},"
+ + "{\"id\":\"application_1502919535296_0162\",\"name\":\"job1_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"},"
+ + "{\"id\":\"application_1502919535296_0165\",\"name\":\"job2_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"},"
+ + "{\"id\":\"application_1502919535296_0164\",\"name\":\"job3_1\",\"state\":\"RUNNING\",\"finalStatus\":\"UNDEFINED\",\"applicationType\":\"Samza\"}]}}";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ provider = spy(new YarnRestJobStatusProvider(new JobsResourceConfig(new MapConfig())));
+ }
+
+ @Test
+ public void testGetJobStatuses() throws IOException, InterruptedException {
+ doReturn(APPS_RESPONSE.getBytes()).when(provider).httpGet(anyString());
+
+ List<Job> jobs = Lists.newArrayList(
+ new Job("job1", "1"), // Job with multiple applications, 1 RUNNING
+ new Job("job2", "1"), // Job with 1 KILLED application
+ new Job("job3", "1"), // Job with 1 RUNNING application
+ new Job("job4", "1")); // Job not found in YARN
+ provider.getJobStatuses(jobs);
+
+ Collections.sort(jobs, (o1, o2) -> o1.getJobName().compareTo(o2.getJobName()));
+
+ assertEquals(4, jobs.size());
+ verifyJobStatus(jobs.get(0), "job1", JobStatus.STARTED, "RUNNING");
+ verifyJobStatus(jobs.get(1), "job2", JobStatus.STOPPED, "KILLED");
+ verifyJobStatus(jobs.get(2), "job3", JobStatus.STARTED, "RUNNING");
+ verifyJobStatus(jobs.get(3), "job4", JobStatus.UNKNOWN, null);
+ }
+
+ private void verifyJobStatus(Job job, String jobName, JobStatus samzaStatus, String yarnStatus) {
+ assertEquals(jobName, job.getJobName());
+ assertEquals(samzaStatus, job.getStatus());
+ assertEquals(yarnStatus, job.getStatusDetail());
+ }
+}