You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/10/25 22:18:28 UTC

samza git commit: SAMZA-1470: Wrong job status returned by YarnRestJobStatusProvider wh…

Repository: samza
Updated Branches:
  refs/heads/master 5f81b8d13 -> 85184d05b


SAMZA-1470: Wrong job status returned by YarnRestJobStatusProvider wh…

…en there are multiple app

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Jagadish <jv...@linkedin.com>

Closes #339 from jmakes/samza-1470-4


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/85184d05
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/85184d05
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/85184d05

Branch: refs/heads/master
Commit: 85184d05b99a698c08b120914060a3240ea03d2b
Parents: 5f81b8d
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Oct 25 15:18:11 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Oct 25 15:18:11 2017 -0700

----------------------------------------------------------------------
 .../rest/model/yarn/YarnApplicationInfo.java    | 12 ++-
 .../proxy/job/YarnRestJobStatusProvider.java    | 43 ++++++-----
 .../job/TestYarnRestJobStatusProvider.java      | 77 ++++++++++++++++++++
 3 files changed, 107 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
index 1c7f757..8d55c89 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/yarn/YarnApplicationInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.rest.model.yarn;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.rest.proxy.job.JobInstance;
@@ -40,14 +41,11 @@ public class YarnApplicationInfo {
   }
 
   /**
-   * Returns a Map with all the apps and their names as the key.
+   *
+   * @return the full list of Yarn applications. There will likely be more than one per job-instance.
    */
-  public Map<String, YarnApplication> getApplications() {
-    Map<String, YarnApplication> applications = new HashMap<>();
-    for (YarnApplication app: this.apps) {
-      applications.put(app.getName(), app);
-    }
-    return applications;
+  public List<YarnApplication> getYarnApplications() {
+    return Collections.unmodifiableList(apps);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
index 63a1ae4..a28e4b2 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/YarnRestJobStatusProvider.java
@@ -29,9 +29,10 @@ import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.samza.SamzaException;
-import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
+import org.apache.samza.rest.model.yarn.YarnApplicationInfo.YarnApplication;
 import org.apache.samza.rest.resources.JobsResourceConfig;
 import org.apache.samza.rest.resources.YarnJobResourceConfig;
 import org.codehaus.jackson.map.DeserializationConfig;
@@ -39,7 +40,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * An implementation of the {@link JobStatusProvider} that retrieves
  * the job status from the YARN REST api.
@@ -52,12 +52,12 @@ public class YarnRestJobStatusProvider implements JobStatusProvider {
   private final String apiEndpoint;
   private final HttpClient httpClient;
 
-  public YarnRestJobStatusProvider(JobsResourceConfig config) {
+  YarnRestJobStatusProvider(JobsResourceConfig config) {
     YarnJobResourceConfig yarnConfig = new YarnJobResourceConfig(config);
+
     this.httpClient = new HttpClient();
+    this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps", yarnConfig.getYarnResourceManagerEndpoint());
     OBJECT_MAPPER.configure(DeserializationConfig.Feature.UNWRAP_ROOT_VALUE, true);
-    this.apiEndpoint = String.format("http://%s/ws/v1/cluster/apps",
-        yarnConfig.getYarnResourceManagerEndpoint());
   }
 
   @Override
@@ -66,21 +66,28 @@ public class YarnRestJobStatusProvider implements JobStatusProvider {
     if (jobs == null || jobs.isEmpty()) {
       return;
     }
+
+    // We will identify the YARN application states by their qualified names, so build a map
+    // to translate back from that name to the JobInfo we wish to populate.
+    final Map<String, Job> qualifiedJobToInfo = new HashMap<>();
+    for(Job job : jobs) {
+      qualifiedJobToInfo.put(YarnApplicationInfo.getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId())), job);
+    }
+
     try {
       byte[] response = httpGet(apiEndpoint);
       YarnApplicationInfo yarnApplicationInfo = OBJECT_MAPPER.readValue(response, YarnApplicationInfo.class);
-      Map<String, YarnApplicationInfo.YarnApplication> yarnApplications = yarnApplicationInfo.getApplications();
-      for (Job job: jobs) {
-        String qualifiedJobName = YarnApplicationInfo.getQualifiedJobName(new JobInstance(job.getJobName(), job.getJobId()));
-        YarnApplicationInfo.YarnApplication yarnApp = yarnApplications.get(qualifiedJobName);
-        if (yarnApp == null) {
-          job.setStatusDetail(JobStatus.UNKNOWN.toString());
-          job.setStatus(JobStatus.UNKNOWN);
-          continue;
-        }
-        JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(yarnApp.getState().toUpperCase()));
-        if (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED) {
-          job.setStatusDetail(yarnApp.getState());
+
+      // There can be multiple Yarn apps for each qualified job name, so we iterate the former and match with latter.
+      for (YarnApplication app: yarnApplicationInfo.getYarnApplications()) {
+        Job job = qualifiedJobToInfo.get(app.getName());
+        JobStatus samzaStatus = yarnStateToSamzaStatus(YarnApplicationState.valueOf(app.getState().toUpperCase()));
+
+        // If job is null, it wasn't requested.  The default statusDetail is null so always update in that case.
+        // Only update the job status if the current status is not STOPPED because there could be many
+        // application attempts for the job, and we're interested in the RUNNING one if it exists.
+        if (job != null && (job.getStatusDetail() == null || samzaStatus != JobStatus.STOPPED)) {
+          job.setStatusDetail(app.getState());
           job.setStatus(samzaStatus);
         }
       }
@@ -126,7 +133,7 @@ public class YarnRestJobStatusProvider implements JobStatusProvider {
    * @return the response
    * @throws IOException if there are problems with the http get request.
    */
-  private byte[] httpGet(String requestUrl)
+  byte[] httpGet(String requestUrl)
       throws IOException {
     GetMethod getMethod = new GetMethod(requestUrl);
     try {

http://git-wip-us.apache.org/repos/asf/samza/blob/85184d05/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java b/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java
new file mode 100644
index 0000000..0875a6f
--- /dev/null
+++ b/samza-rest/src/test/java/org/apache/samza/rest/proxy/job/TestYarnRestJobStatusProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest.proxy.job;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.rest.model.Job;
+import org.apache.samza.rest.model.JobStatus;
+import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestYarnRestJobStatusProvider extends TestCase {
+  private YarnRestJobStatusProvider provider;
+
+  private static final String APPS_RESPONSE =
+      "{\"apps\":{\"app\":[{\"id\":\"application_1502919535296_0161\",\"name\":\"job1_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"},"
+          + "{\"id\":\"application_1502919535296_0163\",\"name\":\"job1_1\",\"state\":\"RUNNING\",\"finalStatus\":\"UNDEFINED\",\"applicationType\":\"Samza\"},"
+          + "{\"id\":\"application_1502919535296_0162\",\"name\":\"job1_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"},"
+          + "{\"id\":\"application_1502919535296_0165\",\"name\":\"job2_1\",\"state\":\"KILLED\",\"finalStatus\":\"KILLED\",\"applicationType\":\"Samza\"},"
+          + "{\"id\":\"application_1502919535296_0164\",\"name\":\"job3_1\",\"state\":\"RUNNING\",\"finalStatus\":\"UNDEFINED\",\"applicationType\":\"Samza\"}]}}";
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    provider = spy(new YarnRestJobStatusProvider(new JobsResourceConfig(new MapConfig())));
+  }
+
+  @Test
+  public void testGetJobStatuses() throws IOException, InterruptedException {
+    doReturn(APPS_RESPONSE.getBytes()).when(provider).httpGet(anyString());
+
+    List<Job> jobs = Lists.newArrayList(
+        new Job("job1", "1"),  // Job with multiple applications, 1 RUNNING
+        new Job("job2", "1"),  // Job with 1 KILLED application
+        new Job("job3", "1"),  // Job with 1 RUNNING application
+        new Job("job4", "1")); // Job not found in YARN
+    provider.getJobStatuses(jobs);
+
+    Collections.sort(jobs, (o1, o2) -> o1.getJobName().compareTo(o2.getJobName()));
+
+    assertEquals(4, jobs.size());
+    verifyJobStatus(jobs.get(0), "job1", JobStatus.STARTED, "RUNNING");
+    verifyJobStatus(jobs.get(1), "job2", JobStatus.STOPPED, "KILLED");
+    verifyJobStatus(jobs.get(2), "job3", JobStatus.STARTED, "RUNNING");
+    verifyJobStatus(jobs.get(3), "job4", JobStatus.UNKNOWN, null);
+  }
+
+  private void verifyJobStatus(Job job, String jobName, JobStatus samzaStatus, String yarnStatus) {
+    assertEquals(jobName, job.getJobName());
+    assertEquals(samzaStatus, job.getStatus());
+    assertEquals(yarnStatus, job.getStatusDetail());
+  }
+}