You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/02 20:07:37 UTC
samza git commit: SAMZA-930: fix issue with json deserialisation in
YarnUtil
Repository: samza
Updated Branches:
refs/heads/master 373181c57 -> 668a952ac
SAMZA-930: fix issue with json deserialisation in YarnUtil
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/668a952a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/668a952a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/668a952a
Branch: refs/heads/master
Commit: 668a952ac628eb1d9cac58ec21797de9be2b9747
Parents: 373181c
Author: Alex Buck <al...@gmail.com>
Authored: Mon May 2 11:06:21 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon May 2 11:06:21 2016 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 +
.../samza/autoscaling/utils/YarnUtil.java | 12 +-
.../samza/autoscaling/utils/YarnUtilTest.java | 38 ++++++
.../resources/exampleResourceManagerOutput.json | 121 +++++++++++++++++++
4 files changed, 168 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c15b8e7..9afab88 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -30,6 +30,7 @@
<allow pkg="org.mockito" />
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka" />
+ <allow pkg="org.apache.commons" />
<subpackage name="config">
<allow class="org.apache.samza.SamzaException" />
http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
index 376c549..cab46b9 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
@@ -74,11 +74,8 @@ public class YarnUtil {
String applications = EntityUtils.toString(httpResponse.getEntity());
log.debug("applications: " + applications);
- ObjectMapper mapper = new ObjectMapper();
- Map<String, Map<String, List<Map<String, String>>>> yarnApplications = mapper.readValue(applications, new TypeReference<Map<String, Map<String, List<Map<String, String>>>>>() {
- });
+ List<Map<String, String>> applicationList = parseYarnApplications(applications);
String name = jobName + "_" + jobID;
- List<Map<String, String>> applicationList = yarnApplications.get("apps").get("app");
for (Map<String, String> application : applicationList) {
if (application.containsKey("state") && application.containsKey("name") && application.containsKey("id")) {
if (application.get("state").toString().equals("RUNNING") && application.get("name").toString().equals(name)) {
@@ -94,6 +91,13 @@ public class YarnUtil {
return null;
}
+ List<Map<String, String>> parseYarnApplications(String applications) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, Map<String, List<Map<String, String>>>> yarnApplications = mapper.readValue(applications, new TypeReference<Map<String, Map<String, List<Map<String, Object>>>>>() {
+ });
+ return yarnApplications.get("apps").get("app");
+ }
+
/**
* This function returns the state of a given application. This state can be on of the
* {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED"}
http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
new file mode 100644
index 0000000..97ccb2d
--- /dev/null
+++ b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.samza.autoscaling.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class YarnUtilTest {
+
+ @Test
+ public void handleJsonArraysAsWellAsStrings() throws IOException {
+ YarnUtil yarnUtil = new YarnUtil("rm", 0);
+ List<Map<String, String>> applications = yarnUtil.parseYarnApplications(IOUtils.toString(getClass().getClassLoader().getResourceAsStream("exampleResourceManagerOutput.json")));
+ assertEquals("RUNNING", applications.get(0).get("state"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json b/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json
new file mode 100644
index 0000000..9f8a025
--- /dev/null
+++ b/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json
@@ -0,0 +1,121 @@
+{
+ "apps": {
+ "app": [
+ {
+ "id": "application_1459790549146_0003",
+ "user": "root",
+ "name": "protodeserializer_1",
+ "queue": "default",
+ "state": "RUNNING",
+ "finalStatus": "UNDEFINED",
+ "progress": 0,
+ "trackingUI": "ApplicationMaster",
+ "trackingUrl": "http://yarnrm:8088/proxy/application_1459790549146_0003/",
+ "diagnostics": "",
+ "clusterId": 1459790549146,
+ "applicationType": "Samza",
+ "applicationTags": "",
+ "startedTime": 1459792852675,
+ "finishedTime": 0,
+ "elapsedTime": 738921,
+ "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0003_01_000001/root",
+ "amHostHttpAddress": "yarnnm:8042",
+ "allocatedMB": 1024,
+ "allocatedVCores": 2,
+ "runningContainers": 2,
+ "memorySeconds": 749045,
+ "vcoreSeconds": 1462,
+ "preemptedResourceMB": 0,
+ "preemptedResourceVCores": 0,
+ "numNonAMContainerPreempted": 0,
+ "numAMContainerPreempted": 0,
+ "resourceRequests": [
+ {
+ "capability": {
+ "memory": 512,
+ "virtualCores": 1
+ },
+ "nodeLabelExpression": "",
+ "numContainers": 0,
+ "priority": {
+ "priority": 0
+ },
+ "relaxLocality": true,
+ "resourceName": "*"
+ },
+ {
+ "capability": {
+ "memory": 512,
+ "virtualCores": 1
+ },
+ "nodeLabelExpression": "",
+ "numContainers": 0,
+ "priority": {
+ "priority": 0
+ },
+ "relaxLocality": true,
+ "resourceName": "/default-rack"
+ }
+ ]
+ },
+ {
+ "id": "application_1459790549146_0002",
+ "user": "root",
+ "name": "protodeserializer_1",
+ "queue": "default",
+ "state": "KILLED",
+ "finalStatus": "KILLED",
+ "progress": 100,
+ "trackingUI": "History",
+ "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0002",
+ "diagnostics": "Application killed by user.",
+ "clusterId": 1459790549146,
+ "applicationType": "Samza",
+ "applicationTags": "",
+ "startedTime": 1459791820396,
+ "finishedTime": 1459792284264,
+ "elapsedTime": 463868,
+ "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0002_01_000001/root",
+ "amHostHttpAddress": "yarnnm:8042",
+ "allocatedMB": -1,
+ "allocatedVCores": -1,
+ "runningContainers": -1,
+ "memorySeconds": 462177,
+ "vcoreSeconds": 902,
+ "preemptedResourceMB": 0,
+ "preemptedResourceVCores": 0,
+ "numNonAMContainerPreempted": 0,
+ "numAMContainerPreempted": 0
+ },
+ {
+ "id": "application_1459790549146_0001",
+ "user": "root",
+ "name": "protodeserializer_1",
+ "queue": "default",
+ "state": "KILLED",
+ "finalStatus": "KILLED",
+ "progress": 100,
+ "trackingUI": "History",
+ "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0001",
+ "diagnostics": "Application killed by user.",
+ "clusterId": 1459790549146,
+ "applicationType": "Samza",
+ "applicationTags": "",
+ "startedTime": 1459791108916,
+ "finishedTime": 1459791813659,
+ "elapsedTime": 704743,
+ "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0001_01_000001/root",
+ "amHostHttpAddress": "yarnnm:8042",
+ "allocatedMB": -1,
+ "allocatedVCores": -1,
+ "runningContainers": -1,
+ "memorySeconds": 711605,
+ "vcoreSeconds": 1389,
+ "preemptedResourceMB": 0,
+ "preemptedResourceVCores": 0,
+ "numNonAMContainerPreempted": 0,
+ "numAMContainerPreempted": 0
+ }
+ ]
+ }
+}
\ No newline at end of file