You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/02 08:00:28 UTC
incubator-eagle git commit: [EAGLE-805] sync some operation in
RunningJobManager
Repository: incubator-eagle
Updated Branches:
refs/heads/master e6c4b0a73 -> aef7ea36c
[EAGLE-805] sync some operation in RunningJobManager
EAGLE-805 sync some operation in RunningJobManager
- Acquire lock when recoverYarnApp and recover methods were invoked.
- Add unit test for the two methods under high concurrency.
- Modify lockPath to avoid exception when curator.getChildren().
https://issues.apache.org/jira/browse/EAGLE-805
Author: chitin <ch...@gmail.com>
Closes #700 from chitin/EAGLE-805.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/aef7ea36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/aef7ea36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/aef7ea36
Branch: refs/heads/master
Commit: aef7ea36c1473b29f76420dd67d646e738f7d10b
Parents: e6c4b0a
Author: chitin <ch...@gmail.com>
Authored: Fri Dec 2 16:00:23 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Fri Dec 2 16:00:23 2016 +0800
----------------------------------------------------------------------
.../jpm/mr/running/MRRunningJobManagerTest.java | 112 +++++++++++++++++--
.../src/test/resources/jobInfo_805.json | 4 +
.../java/org/apache/eagle/jpm/util/Utils.java | 2 +-
.../jpm/util/jobrecover/RunningJobManager.java | 105 ++++++++++-------
.../org/apache/eagle/jpm/util/UtilsTest.java | 2 +-
5 files changed, 170 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aef7ea36/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
index 55f76e2..4c52e10 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
@@ -24,18 +24,21 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
import org.apache.zookeeper.CreateMode;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -60,16 +63,14 @@ public class MRRunningJobManagerTest {
private static MRRunningJobConfig.EndpointConfig endpointConfig;
private static MRRunningJobConfig.ZKStateConfig zkStateConfig;
private static org.slf4j.Logger log = mock(org.slf4j.Logger.class);
+ private static final int BUFFER_SIZE = 4096;
+ private static final String LOCKS_BASE_PATH = "/locks";
@BeforeClass
public static void setupZookeeper() throws Exception {
zk = new TestingServer();
curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3));
curator.start();
- curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(SHARE_RESOURCES);
MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
zkStateConfig = mrRunningJobConfig.getZkStateConfig();
zkStateConfig.zkQuorum = zk.getConnectString();
@@ -80,14 +81,33 @@ public class MRRunningJobManagerTest {
@AfterClass
public static void teardownZookeeper() throws Exception {
+ CloseableUtils.closeQuietly(curator);
+ CloseableUtils.closeQuietly(zk);
+ }
+
+ @Before
+ public void createPath() throws Exception {
+ if(curator.checkExists().forPath(SHARE_RESOURCES) == null) {
+ curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(SHARE_RESOURCES);
+ }
+ }
+
+ @After
+ public void cleanPath() throws Exception {
if (curator.checkExists().forPath(SHARE_RESOURCES) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
}
- CloseableUtils.closeQuietly(curator);
- CloseableUtils.closeQuietly(zk);
+ if (curator.checkExists().forPath(LOCKS_BASE_PATH) != null) {
+ curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(LOCKS_BASE_PATH);
+ }
}
+
@Test
+ @Ignore
public void testMRRunningJobManagerDelWithLock() throws Exception {
Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
@@ -100,7 +120,6 @@ public class MRRunningJobManagerTest {
mrRunningJobManager.delete("yarnAppId", "jobId");
}
} catch (Exception e) {
- e.printStackTrace();
// log or do something
}
return null;
@@ -117,4 +136,75 @@ public class MRRunningJobManagerTest {
}
+ @Test
+ @Ignore
+ public void testMRRunningJobManagerRecoverYarnAppWithLock() throws Exception {
+ Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
+ curator.setData().forPath(SHARE_RESOURCES, generateZkSetData());
+ ExecutorService service = Executors.newFixedThreadPool(QTY);
+ for (int i = 0; i < QTY; ++i) {
+ Callable<Void> task = () -> {
+ try {
+ MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
+ for (int j = 0; j < REPETITIONS; ++j) {
+ if(j % 3 == 0) {
+ mrRunningJobManager.delete("yarnAppId", "jobId");
+ } else {
+ mrRunningJobManager.recoverYarnApp("yarnAppId");
+ }
+ }
+ } catch (Exception e) {
+ // log or do something
+ }
+ return null;
+ };
+ service.submit(task);
+ }
+
+ service.shutdown();
+ service.awaitTermination(10, TimeUnit.MINUTES);
+ verify(log, never()).error(anyString(), any(Throwable.class));
+ }
+
+ @Test
+ public void testMRRunningJobManagerRecoverWithLock() throws Exception {
+ Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
+ curator.setData().forPath(SHARE_RESOURCES, generateZkSetData());
+ ExecutorService service = Executors.newFixedThreadPool(QTY);
+ for (int i = 0; i < QTY; ++i) {
+ Callable<Void> task = () -> {
+ try {
+ MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
+ for (int j = 0; j < REPETITIONS; ++j) {
+ if(j % 3 == 0) {
+ mrRunningJobManager.delete("yarnAppId", "jobId");
+ } else {
+ mrRunningJobManager.recover();
+ }
+ }
+ } catch (Exception e) {
+ // log or do something
+ }
+ return null;
+ };
+ service.submit(task);
+ }
+
+ service.shutdown();
+ service.awaitTermination(10, TimeUnit.MINUTES);
+ verify(log, never()).error(anyString(), any(Throwable.class));
+ }
+
+ private byte[] generateZkSetData() throws IOException {
+ InputStream jsonstream = this.getClass().getResourceAsStream("/jobInfo_805.json");
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ byte[] data = new byte[BUFFER_SIZE];
+ int count = -1;
+ while((count = jsonstream.read(data, 0, BUFFER_SIZE)) != -1) {
+ outputStream.write(data, 0, count);
+ }
+ data = null;
+ return outputStream.toByteArray();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aef7ea36/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobInfo_805.json
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobInfo_805.json b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobInfo_805.json
new file mode 100644
index 0000000..7c19ade
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobInfo_805.json
@@ -0,0 +1,4 @@
+{
+ "entityTags": "{\"jobName\":\"UserProfileCollectorRedisMR_Redis_Job\",\"jobId\":\"job_1479206441898_271161\",\"site\":\"sandbox\",\"jobDefId\":\"UserProfileCollectorRedisMR_Redis_Job\",\"jobType\":\"N\\/A\",\"user\":\"eagle\",\"queue\":\"eagle\"}",
+ "appInfo": "{\"applicationType\":\"MAPREDUCE\",\"startedTime\":\"1480330000169\",\"finalStatus\":\"UNDEFINED\",\"trackingUrl\":\"http:\\/\\/host.domain.com:8088\\/proxy\\/application_1479206441898_271161\\/\",\"runningContainers\":\"1\",\"trackingUI\":\"ApplicationMaster\",\"clusterId\":\"1479206441898\",\"amContainerLogs\":\"http:\\/\\/host.domain.com:8042\\/node\\/containerlogs\\/container_e11_1479206441898_271161_01_000001\\/eagle\",\"allocatedVCores\":\"1\",\"diagnostics\":\"\",\"name\":\"UserProfileCollectorRedisMR_Redis_Job\",\"progress\":\"0.0\",\"finishedTime\":\"0\",\"allocatedMB\":\"1024\",\"id\":\"application_1479206441898_271161\",\"state\":\"RUNNING\",\"amHostHttpAddress\":\"host.domain.com:8042\",\"user\":\"eagle\",\"queue\":\"eagle\",\"elapsedTime\":\"6292\"}"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aef7ea36/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 9025d36..7fb6643 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -116,6 +116,6 @@ public class Utils {
public static String makeLockPath(String zkrootWithSiteId) {
Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), "zkrootWithSiteId must not be blank");
- return zkrootWithSiteId.toLowerCase() + "/locks";
+ return "/locks" + zkrootWithSiteId.toLowerCase();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aef7ea36/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index 1857707..95c531c 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -18,7 +18,6 @@
package org.apache.eagle.jpm.util.jobrecover;
-import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.commons.lang3.tuple.Pair;
@@ -30,7 +29,6 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.Serializable;
import java.util.*;
@@ -77,50 +75,66 @@ public class RunningJobManager implements Serializable {
public Map<String, Pair<Map<String, String>, AppInfo>> recoverYarnApp(String yarnAppId) throws Exception {
Map<String, Pair<Map<String, String>, AppInfo>> result = new HashMap<>();
String path = this.zkRoot + "/" + yarnAppId;
- List<String> jobIds = curator.getChildren().forPath(path);
- if (jobIds.size() == 0) {
- LOG.info("delete empty path {}", path);
- delete(yarnAppId);
- }
-
- for (String jobId : jobIds) {
- String jobPath = path + "/" + jobId;
- LOG.info("recover path {}", jobPath);
- String fields = new String(curator.getData().forPath(jobPath), "UTF-8");
- if (fields.length() == 0) {
- //LOG.info("delete empty path {}", jobPath);
- //delete(yarnAppId, jobId);
- continue;
+ try {
+ lock.acquire();
+ if (curator.checkExists().forPath(path) == null) {
+ return result;
}
- JSONObject object = new JSONObject(fields);
- Map<String, Map<String, String>> parseResult = parse(object);
+ List<String> jobIds = curator.getChildren().forPath(path);
+ if (jobIds.size() == 0) {
+ LOG.info("delete empty path {}", path);
+ delete(yarnAppId);
+ }
+
+ for (String jobId : jobIds) {
+ String jobPath = path + "/" + jobId;
+ LOG.info("recover path {}", jobPath);
+ String fields = new String(curator.getData().forPath(jobPath), "UTF-8");
+ if (fields.length() == 0) {
+ //LOG.info("delete empty path {}", jobPath);
+ //delete(yarnAppId, jobId);
+ continue;
+ }
+ JSONObject object = new JSONObject(fields);
+ Map<String, Map<String, String>> parseResult = parse(object);
- Map<String, String> appInfoMap = parseResult.get(APP_INFO_KEY);
- AppInfo appInfo = new AppInfo();
- appInfo.setId(appInfoMap.get("id"));
- appInfo.setUser(appInfoMap.get("user"));
- appInfo.setName(appInfoMap.get("name"));
- appInfo.setQueue(appInfoMap.get("queue"));
- appInfo.setState(appInfoMap.get("state"));
- appInfo.setFinalStatus(appInfoMap.get("finalStatus"));
- appInfo.setProgress(Double.parseDouble(appInfoMap.get("progress")));
- appInfo.setTrackingUI(appInfoMap.get("trackingUI"));
- appInfo.setDiagnostics(appInfoMap.get("diagnostics"));
- appInfo.setTrackingUrl(appInfoMap.get("trackingUrl"));
- appInfo.setClusterId(appInfoMap.get("clusterId"));
- appInfo.setApplicationType(appInfoMap.get("applicationType"));
- appInfo.setStartedTime(Long.parseLong(appInfoMap.get("startedTime")));
- appInfo.setFinishedTime(Long.parseLong(appInfoMap.get("finishedTime")));
- appInfo.setElapsedTime(Long.parseLong(appInfoMap.get("elapsedTime")));
- appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs") == null ? "" : appInfoMap.get("amContainerLogs"));
- appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress") == null ? "" : appInfoMap.get("amHostHttpAddress"));
- appInfo.setAllocatedMB(Long.parseLong(appInfoMap.get("allocatedMB")));
- appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores")));
- appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));
+ Map<String, String> appInfoMap = parseResult.get(APP_INFO_KEY);
+ AppInfo appInfo = new AppInfo();
+ appInfo.setId(appInfoMap.get("id"));
+ appInfo.setUser(appInfoMap.get("user"));
+ appInfo.setName(appInfoMap.get("name"));
+ appInfo.setQueue(appInfoMap.get("queue"));
+ appInfo.setState(appInfoMap.get("state"));
+ appInfo.setFinalStatus(appInfoMap.get("finalStatus"));
+ appInfo.setProgress(Double.parseDouble(appInfoMap.get("progress")));
+ appInfo.setTrackingUI(appInfoMap.get("trackingUI"));
+ appInfo.setDiagnostics(appInfoMap.get("diagnostics"));
+ appInfo.setTrackingUrl(appInfoMap.get("trackingUrl"));
+ appInfo.setClusterId(appInfoMap.get("clusterId"));
+ appInfo.setApplicationType(appInfoMap.get("applicationType"));
+ appInfo.setStartedTime(Long.parseLong(appInfoMap.get("startedTime")));
+ appInfo.setFinishedTime(Long.parseLong(appInfoMap.get("finishedTime")));
+ appInfo.setElapsedTime(Long.parseLong(appInfoMap.get("elapsedTime")));
+ appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs") == null ? "" : appInfoMap.get("amContainerLogs"));
+ appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress") == null ? "" : appInfoMap.get("amHostHttpAddress"));
+ appInfo.setAllocatedMB(Long.parseLong(appInfoMap.get("allocatedMB")));
+ appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores")));
+ appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));
- Map<String, String> tags = parseResult.get(ENTITY_TAGS_KEY);
- result.put(jobId, Pair.of(tags, appInfo));
+ Map<String, String> tags = parseResult.get(ENTITY_TAGS_KEY);
+ result.put(jobId, Pair.of(tags, appInfo));
+ }
+ } catch (Exception e) {
+ LOG.error("fail to recoverYarnApp", e);
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ lock.release();
+ } catch (Exception e) {
+ LOG.error("fail releasing lock", e);
+ }
}
+
return result;
}
@@ -133,6 +147,7 @@ public class RunningJobManager implements Serializable {
//<yarnAppId, <jobId, Pair<<Map<String, String>, AppInfo>>>
Map<String, Map<String, Pair<Map<String, String>, AppInfo>>> result = new HashMap<>();
try {
+ lock.acquire();
List<String> yarnAppIds = curator.getChildren().forPath(this.zkRoot);
for (String yarnAppId : yarnAppIds) {
if (!result.containsKey(yarnAppId)) {
@@ -144,6 +159,12 @@ public class RunningJobManager implements Serializable {
} catch (Exception e) {
LOG.error("fail to recover", e);
throw new RuntimeException(e);
+ } finally {
+ try {
+ lock.release();
+ } catch (Exception e) {
+ LOG.error("fail releasing lock", e);
+ }
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aef7ea36/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
index 8e89edf..75d46e9 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
@@ -68,7 +68,7 @@ public class UtilsTest {
@Test
public void testMakeLockPath() {
String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId");
- Assert.assertEquals("/apps/mr/running/sitdid/locks", lockpath);
+ Assert.assertEquals("/locks/apps/mr/running/sitdid", lockpath);
}
@Test