You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/25 13:52:49 UTC

incubator-eagle git commit: [EAGLE-800] Use InterProcessMutex to sync operation in RunningJobManager

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 6a0529e96 -> 4d4d8c0ea


[EAGLE-800] Use InterProcessMutex to sync operation in RunningJobManager

 - Use InterProcessMutex to sync operation in RunningJobManager.
 - Use siteId to generate lockpath for InterProcessMutex.
 - Fix some checkstyle problem.

https://issues.apache.org/jira/browse/EAGLE-800

Author: r7raul1984 <ta...@yhd.com>

Closes #684 from r7raul1984/EAGLE-800.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4d4d8c0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4d4d8c0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4d4d8c0e

Branch: refs/heads/master
Commit: 4d4d8c0eafa06d537a307291df03f344cdae6ef5
Parents: 6a0529e
Author: r7raul1984 <ta...@yhd.com>
Authored: Fri Nov 25 21:52:40 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Fri Nov 25 21:52:40 2016 +0800

----------------------------------------------------------------------
 .../impl/storm/zookeeper/ZKStateConfig.java     |   1 +
 .../jpm/mr/running/MRRunningJobConfig.java      |   3 +
 .../mr/running/recover/MRRunningJobManager.java |   2 +-
 .../mr/running/storm/MRRunningJobParseBolt.java |   6 +-
 .../mr/running/MRRunningJobApplicationTest.java |  15 ++-
 .../jpm/mr/running/MRRunningJobManagerTest.java | 120 +++++++++++++++++++
 .../jpm/mr/running/parser/MRJobParserTest.java  |   2 +
 .../src/test/resources/mrconf_30784.xml         |  17 ++-
 .../spark/running/SparkRunningJobAppConfig.java |   3 +
 .../running/recover/SparkRunningJobManager.java |   2 +-
 .../running/storm/SparkRunningJobParseBolt.java |  14 ++-
 .../java/org/apache/eagle/jpm/util/Utils.java   |  12 +-
 .../jpm/util/jobrecover/RunningJobManager.java  |  35 +++---
 .../org/apache/eagle/jpm/util/UtilsTest.java    |  19 +++
 .../hive/HiveQueryMonitoringApplication.java    |  14 +--
 .../hive/config/RunningJobCrawlConfig.java      |   8 +-
 .../hive/jobrunning/HiveJobFetchSpout.java      |  21 ++--
 ...HiveJobRunningSourcedStormSpoutProvider.java |  53 ++++----
 18 files changed, 266 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
index f9515f5..53df455 100644
--- a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
@@ -25,4 +25,5 @@ public class ZKStateConfig implements Serializable {
     public int zkSessionTimeoutMs;
     public int zkRetryTimes;
     public int zkRetryInterval;
+    public String zkLockPath;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 975e821..119867d 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running;
 import org.apache.eagle.common.config.ConfigOptionParser;
 
 import com.typesafe.config.Config;
+import org.apache.eagle.jpm.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,7 @@ public class MRRunningJobConfig implements Serializable {
         public int zkSessionTimeoutMs;
         public int zkRetryTimes;
         public int zkRetryInterval;
+        public String zkLockPath;
     }
 
     public static class EagleServiceConfig implements Serializable {
@@ -108,6 +110,7 @@ public class MRRunningJobConfig implements Serializable {
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
         this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId");
+        this.zkStateConfig.zkLockPath = Utils.makeLockPath(this.zkStateConfig.zkRoot);
 
         // parse eagle service endpoint
         this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
index 20a8701..70e6fda 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -34,7 +34,7 @@ public class MRRunningJobManager implements Serializable {
 
     public MRRunningJobManager(MRRunningJobConfig.ZKStateConfig config) {
         this.runningJobManager = new RunningJobManager(config.zkQuorum,
-                config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+                config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot, config.zkLockPath);
     }
 
     public Map<String, JobExecutionAPIEntity> recoverYarnApp(String appId) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index e2767d8..8ec2dec 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -75,8 +75,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
 
     @Override
     public void execute(Tuple tuple) {
-        AppInfo appInfo = (AppInfo)tuple.getValue(1);
-        Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>)tuple.getValue(2);
+        AppInfo appInfo = (AppInfo) tuple.getValue(1);
+        Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>) tuple.getValue(2);
 
         LOG.info("get mr yarn application " + appInfo.getId());
 
@@ -100,7 +100,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
                 });
 
         if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
-            || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
+                || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
             applicationParser.setStatus(MRJobParser.ParserStatus.RUNNING);
             executorService.execute(applicationParser);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index 5d78a50..787c9ac 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -58,15 +58,18 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 @PowerMockIgnore({"javax.*"})
 public class MRRunningJobApplicationTest {
 
-    public static final String RM_URL = "http://sandbox.hortonworks.com:50030/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true";
-    public static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]";
-    public static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]";
-    public static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]";
+    private static final String RM_URL = "http://sandbox.hortonworks.com:50030/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true";
+    private static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]";
+    private static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]";
+    private static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]";
     private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+    private static Config config = ConfigFactory.load();
+    private static String siteId;
 
     @BeforeClass
     public static void setupMapper() throws Exception {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+        siteId = config.getString("siteId");
     }
 
 
@@ -77,8 +80,6 @@ public class MRRunningJobApplicationTest {
         when(Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
 
 
-        MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
-        PowerMockito.whenNew(MRRunningJobManager.class).withArguments(any()).thenReturn(mrRunningJobManager);
         Config config = ConfigFactory.load();
         MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
         List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
@@ -88,6 +89,8 @@ public class MRRunningJobApplicationTest {
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys,
                 config);
+        MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
+        PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager);
         mrRunningJobParseBolt.prepare(null, null, null);
         InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
         AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
new file mode 100644
index 0000000..55f76e2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
+import org.apache.zookeeper.CreateMode;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({MRRunningJobManager.class, RunningJobManager.class, LoggerFactory.class})
+@PowerMockIgnore({"javax.*"})
+public class MRRunningJobManagerTest {
+    private static TestingServer zk;
+    private static com.typesafe.config.Config config = ConfigFactory.load();
+    private static CuratorFramework curator;
+    private static final String SHARE_RESOURCES = "/apps/mr/running/sandbox/yarnAppId/jobId";
+    private static final int QTY = 5;
+    private static final int REPETITIONS = QTY * 10;
+    private static MRRunningJobConfig.EndpointConfig endpointConfig;
+    private static MRRunningJobConfig.ZKStateConfig zkStateConfig;
+    private static org.slf4j.Logger log = mock(org.slf4j.Logger.class);
+
+    @BeforeClass
+    public static void setupZookeeper() throws Exception {
+        zk = new TestingServer();
+        curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+        curator.start();
+        curator.create()
+                .creatingParentsIfNeeded()
+                .withMode(CreateMode.PERSISTENT)
+                .forPath(SHARE_RESOURCES);
+        MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
+        zkStateConfig = mrRunningJobConfig.getZkStateConfig();
+        zkStateConfig.zkQuorum = zk.getConnectString();
+        endpointConfig = mrRunningJobConfig.getEndpointConfig();
+        mockStatic(LoggerFactory.class);
+        when(LoggerFactory.getLogger(any(Class.class))).thenReturn(log);
+    }
+
+    @AfterClass
+    public static void teardownZookeeper() throws Exception {
+        if (curator.checkExists().forPath(SHARE_RESOURCES) != null) {
+            curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
+        }
+        CloseableUtils.closeQuietly(curator);
+        CloseableUtils.closeQuietly(zk);
+    }
+
+    @Test
+    public void testMRRunningJobManagerDelWithLock() throws Exception {
+        Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
+
+        ExecutorService service = Executors.newFixedThreadPool(QTY);
+        for (int i = 0; i < QTY; ++i) {
+            Callable<Void> task = () -> {
+                try {
+                    MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
+                    for (int j = 0; j < REPETITIONS; ++j) {
+                        mrRunningJobManager.delete("yarnAppId", "jobId");
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    // log or do something
+                }
+                return null;
+            };
+            service.submit(task);
+        }
+
+        service.shutdown();
+        service.awaitTermination(10, TimeUnit.MINUTES);
+        Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) == null);
+        verify(log, never()).error(anyString(), anyString(), anyString(), anyString(), any(Throwable.class));
+        verify(log, never()).error(anyString(), anyString(), anyString());
+        verify(log, never()).error(anyString(), any(Throwable.class));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 4b00bb2..561d858 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -72,6 +72,7 @@ public class MRJobParserTest {
     private static final String DATA_FROM_ZK = "{\"entityTags\":\"{\\\"jobName\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"jobId\\\":\\\"job_1479206441898_30784\\\",\\\"site\\\":\\\"sandbox\\\",\\\"jobDefId\\\":\\\"eagletest\\\",\\\"jobType\\\":\\\"HIVE\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\"}\",\"appInfo\":\"{\\\"applicationType\\\":\\\"MAPREDUCE\\\",\\\"startedTime\\\":\\\"1479328221694\\\",\\\"finalStatus\\\":\\\"UNDEFINED\\\",\\\"trackingUrl\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/proxy\\\\\\/application_1479206441898_30784\\\\\\/\\\",\\\"runningContainers\\\":\\\"2\\\",\\\"trackingUI\\\":\\\"ApplicationMaster\\\",\\\"clusterId\\\":\\\"1479206441898\\\",\\\"amContainerLogs\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/node\\\\\\/containerlogs\\\\\\/container_e11_1479206441898_30784_01_000001\\\\\\/xxx\\\",\\\"allocatedVCores\\\":\\\"2\\\",\\\"diagnostics\\\":\\\"\\\",\\\
 "name\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"progress\\\":\\\"95.0\\\",\\\"finishedTime\\\":\\\"0\\\",\\\"allocatedMB\\\":\\\"3072\\\",\\\"id\\\":\\\"application_1479206441898_30784\\\",\\\"state\\\":\\\"RUNNING\\\",\\\"amHostHttpAddress\\\":\\\"host.domain.com:8088\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\",\\\"elapsedTime\\\":\\\"13367402\\\"}\"}";
     private static TestingServer zk;
     private static String ZKROOT;
+    private static String siteId;
     private static MRRunningJobConfig mrRunningJobConfig;
     private static Config config = ConfigFactory.load();
     private static CuratorFramework curator;
@@ -83,6 +84,7 @@ public class MRJobParserTest {
         zk = new TestingServer();
         curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new RetryOneTime(1));
         mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
+        siteId = mrRunningJobConfig.getEndpointConfig().site;
         mrRunningJobConfig.getZkStateConfig().zkQuorum = zk.getConnectString();
         ZKROOT = mrRunningJobConfig.getZkStateConfig().zkRoot;
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
index 78d61b5..66da734 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
@@ -1 +1,16 @@
-\ufeff<conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.
 xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf>
\ No newline at end of file
+\ufeff<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+--><conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.
 xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index 3ae4a35..c5ec6ce 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -20,6 +20,7 @@ package org.apache.eagle.jpm.spark.running;
 
 import com.typesafe.config.ConfigValue;
 import com.typesafe.config.Config;
+import org.apache.eagle.jpm.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +66,7 @@ public class SparkRunningJobAppConfig implements Serializable {
         public int zkRetryTimes;
         public int zkRetryInterval;
         public boolean recoverEnabled;
+        public String zkLockPath;
     }
 
     public static class EagleServiceConfig implements Serializable {
@@ -119,6 +121,7 @@ public class SparkRunningJobAppConfig implements Serializable {
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
         this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT;
+        this.zkStateConfig.zkLockPath = Utils.makeLockPath(DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId"));
         if (config.hasPath("zookeeper.zkRoot")) {
             this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
index 3fb6371..4fbf53b 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -33,7 +33,7 @@ public class SparkRunningJobManager implements Serializable {
 
     public SparkRunningJobManager(SparkRunningJobAppConfig.ZKStateConfig config) {
         this.runningJobManager = new RunningJobManager(config.zkQuorum,
-                config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+                config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot, config.zkLockPath);
     }
 
     public Map<String, SparkAppEntity> recoverYarnApp(String appId) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index 9c0ffef..0a74348 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -74,14 +74,20 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
     @SuppressWarnings("unchecked")
     @Override
     public void execute(Tuple tuple) {
-        AppInfo appInfo = (AppInfo)tuple.getValue(1);
-        Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>)tuple.getValue(2);
+        AppInfo appInfo = (AppInfo) tuple.getValue(1);
+        Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>) tuple.getValue(2);
 
         LOG.info("get spark yarn application " + appInfo.getId());
 
         SparkApplicationParser applicationParser;
         if (!runningSparkParsers.containsKey(appInfo.getId())) {
-            applicationParser = new SparkApplicationParser(eagleServiceConfig, endpointConfig, jobExtractorConfig, appInfo, sparkApp, new SparkRunningJobManager(zkStateConfig), resourceFetcher);
+            applicationParser = new SparkApplicationParser(eagleServiceConfig,
+                    endpointConfig,
+                    jobExtractorConfig,
+                    appInfo,
+                    sparkApp,
+                    new SparkRunningJobManager(zkStateConfig),
+                    resourceFetcher);
             runningSparkParsers.put(appInfo.getId(), applicationParser);
             LOG.info("create application parser for {}", appInfo.getId());
         } else {
@@ -97,7 +103,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
                 });
 
         if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
-            || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
+                || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
             applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING);
             executorService.execute(applicationParser);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 91077df..9025d36 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.jpm.util;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +29,6 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 public class Utils {
@@ -86,11 +87,11 @@ public class Utils {
             int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
             return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB;
         }
-        LOG.warn("Cannot parse memory info " +  memory);
+        LOG.warn("Cannot parse memory info " + memory);
 
         return 0L;
     }
-    
+
     public static Constants.JobType fetchJobType(Map config) {
         if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
             return Constants.JobType.CASCADING;
@@ -112,4 +113,9 @@ public class Utils {
         config.forEach(entry -> mapConfig.put(entry.getKey(), entry.getValue()));
         return fetchJobType(mapConfig);
     }
+
+    public static String makeLockPath(String zkrootWithSiteId) {
+        Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), "zkrootWithSiteId must not be blank");
+        return zkrootWithSiteId.toLowerCase() + "/locks";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index a2d97bf..1857707 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.jpm.util.jobrecover;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.CuratorFramework;
@@ -39,8 +41,9 @@ public class RunningJobManager implements Serializable {
     private static final String ENTITY_TAGS_KEY = "entityTags";
     private static final String APP_INFO_KEY = "appInfo";
     private static final String ZNODE_LAST_FINISH_TIME = "lastFinishTime";
+    private final InterProcessMutex lock;
 
-    private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) throws Exception {
+    private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) {
         return CuratorFrameworkFactory.newClient(
                 zkQuorum,
                 zkSessionTimeoutMs,
@@ -49,12 +52,17 @@ public class RunningJobManager implements Serializable {
         );
     }
 
-    public RunningJobManager(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval, String zkRoot) {
+    public RunningJobManager(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval, String zkRoot, String lockPath) {
         this.zkRoot = zkRoot;
-
+        curator = newCurator(zkQuorum, zkSessionTimeoutMs, zkRetryTimes, zkRetryInterval);
         try {
-            curator = newCurator(zkQuorum, zkSessionTimeoutMs, zkRetryTimes, zkRetryInterval);
             curator.start();
+        } catch (Exception e) {
+            LOG.error("curator start error {}", e);
+        }
+        LOG.info("InterProcessMutex lock path is " + lockPath);
+        lock = new InterProcessMutex(curator, lockPath);
+        try {
             if (curator.checkExists().forPath(this.zkRoot) == null) {
                 curator.create()
                         .creatingParentsIfNeeded()
@@ -142,7 +150,6 @@ public class RunningJobManager implements Serializable {
 
     public boolean update(String yarnAppId, String jobId, Map<String, String> tags, AppInfo app) {
         String path = this.zkRoot + "/" + yarnAppId + "/" + jobId;
-        //InterProcessMutex lock = new InterProcessMutex(curator, path);
         Map<String, String> appInfo = new HashMap<>();
         appInfo.put("id", app.getId());
         appInfo.put("user", app.getUser());
@@ -169,7 +176,7 @@ public class RunningJobManager implements Serializable {
         fields.put(ENTITY_TAGS_KEY, (new JSONObject(tags)).toString());
         fields.put(APP_INFO_KEY, (new JSONObject(appInfo)).toString());
         try {
-            //lock.acquire();
+            lock.acquire();
             JSONObject object = new JSONObject(fields);
             if (curator.checkExists().forPath(path) == null) {
                 curator.create()
@@ -183,7 +190,7 @@ public class RunningJobManager implements Serializable {
             LOG.error("failed to update job {} for yarn app {} ", jobId, yarnAppId);
         } finally {
             try {
-                //lock.release();
+                lock.release();
             } catch (Exception e) {
                 LOG.error("fail releasing lock", e);
             }
@@ -193,9 +200,8 @@ public class RunningJobManager implements Serializable {
 
     public void delete(String yarnAppId, String jobId) {
         String path = this.zkRoot + "/" + yarnAppId + "/" + jobId;
-        //InterProcessMutex lock = new InterProcessMutex(curator, path);
         try {
-            //lock.acquire();
+            lock.acquire();
             if (curator.checkExists().forPath(path) != null) {
                 curator.delete().deletingChildrenIfNeeded().forPath(path);
                 LOG.info("delete job {} for yarn app {}, path {} ", jobId, yarnAppId, path);
@@ -208,7 +214,7 @@ public class RunningJobManager implements Serializable {
             LOG.error("failed to delete job {} for yarn app {}, path {}, {}", jobId, yarnAppId, path, e);
         } finally {
             try {
-                //lock.release();
+                lock.release();
             } catch (Exception e) {
                 LOG.error("fail releasing lock", e);
 
@@ -218,9 +224,8 @@ public class RunningJobManager implements Serializable {
 
     public void delete(String yarnAppId) {
         String path = this.zkRoot + "/" + yarnAppId;
-        //InterProcessMutex lock = new InterProcessMutex(curator, path);
         try {
-            //lock.acquire();
+            lock.acquire();
             if (curator.checkExists().forPath(path) != null) {
                 curator.delete().forPath(path);
                 LOG.info("delete yarn app {}, path {} ", yarnAppId, path);
@@ -229,7 +234,7 @@ public class RunningJobManager implements Serializable {
             LOG.error("failed to delete yarn app {}, path {} ", yarnAppId, path);
         } finally {
             try {
-                //lock.release();
+                lock.release();
             } catch (Exception e) {
                 LOG.error("fail releasing lock", e);
             }
@@ -243,14 +248,14 @@ public class RunningJobManager implements Serializable {
         while (keysItr.hasNext()) {
             String key = keysItr.next();
             result.put(key, new HashMap<>());
-            String value = (String)object.get(key);
+            String value = (String) object.get(key);
 
             JSONObject jsonObject = new JSONObject(value);
             Map<String, String> items = result.get(key);
             Iterator<String> keyItemItr = jsonObject.keys();
             while (keyItemItr.hasNext()) {
                 String itemKey = keyItemItr.next();
-                items.put(itemKey, (String)jsonObject.get(itemKey));
+                items.put(itemKey, (String) jsonObject.get(itemKey));
             }
         }
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
index b29a8e0..8e89edf 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
@@ -64,4 +64,23 @@ public class UtilsTest {
         thrown.expect(IllegalArgumentException.class);
         Utils.parseMemory("0.1g");
     }
+
+    @Test
+    public void testMakeLockPath() {
+        String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId");
+        Assert.assertEquals("/apps/mr/running/sitdid/locks", lockpath);
+    }
+
+    @Test
+    public void testMakeLockPath1() {
+        thrown.expect(IllegalArgumentException.class);
+        Utils.makeLockPath("");
+    }
+
+    @Test
+    public void testMakeLockPath2() {
+        thrown.expect(IllegalArgumentException.class);
+        Utils.makeLockPath(null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
index 6abea3d..7a4509b 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
@@ -36,11 +36,11 @@ import org.apache.eagle.security.hive.sensitivity.HiveSensitivityDataEnrichBolt;
  * Since 8/11/16.
  */
 public class HiveQueryMonitoringApplication extends StormApplication {
-    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public final static String FILTER_TASK_NUM = "topology.numOfFilterTasks";
-    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
-    public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
-    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+    private static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    private static final String FILTER_TASK_NUM = "topology.numOfFilterTasks";
+    private static final String PARSER_TASK_NUM = "topology.numOfParserTasks";
+    private static final String JOIN_TASK_NUM = "topology.numOfJoinTasks";
+    private static final String SINK_TASK_NUM = "topology.numOfSinkTasks";
 
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
@@ -68,13 +68,13 @@ public class HiveQueryMonitoringApplication extends StormApplication {
         BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
         joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("user"));
 
-        StormStreamSink sinkBolt = environment.getStreamSink("hive_query_stream",config);
+        StormStreamSink sinkBolt = environment.getStreamSink("hive_query_stream", config);
         BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
         kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
         return builder.createTopology();
     }
 
-    public static void main(String[] args){
+    public static void main(String[] args) {
         Config config = ConfigFactory.load();
         HiveQueryMonitoringApplication app = new HiveQueryMonitoringApplication();
         app.run(config);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
index 2662698..35df281 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
@@ -21,25 +21,25 @@ import org.apache.eagle.jpm.util.JobIdPartitioner;
 
 import java.io.Serializable;
 
-public class RunningJobCrawlConfig implements Serializable{
+public class RunningJobCrawlConfig implements Serializable {
     private static final long serialVersionUID = 1L;
     public RunningJobEndpointConfig endPointConfig;
     public ControlConfig controlConfig;
     public ZKStateConfig zkStateConfig;
 
-    public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){
+    public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig) {
         this.endPointConfig = endPointConfig;
         this.controlConfig = controlConfig;
         this.zkStateConfig = zkStateConfig;
     }
 
-    public static class RunningJobEndpointConfig implements Serializable{
+    public static class RunningJobEndpointConfig implements Serializable {
         private static final long serialVersionUID = 1L;
         public String[] RMBasePaths;
         public String HSBasePath;
     }
 
-    public static class ControlConfig implements Serializable{
+    public static class ControlConfig implements Serializable {
         private static final long serialVersionUID = 1L;
         public boolean jobConfigEnabled;
         public boolean jobInfoEnabled;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
index af4599b..5f54f30 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
@@ -94,26 +94,27 @@ public class HiveJobFetchSpout extends BaseRichSpout {
         this.partitionId = calculatePartitionId(context);
         // sanity verify 0<=partitionId<=numTotalPartitions-1
         if (partitionId < 0 || partitionId > crawlConfig.controlConfig.numTotalPartitions) {
-            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
-                partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions);
+            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
+                    + partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions);
         }
         Class<? extends JobIdPartitioner> partitionerCls = crawlConfig.controlConfig.partitionerCls;
         try {
             this.jobFilter = new JobIdFilterByPartition(partitionerCls.newInstance(),
-                crawlConfig.controlConfig.numTotalPartitions, partitionId);
+                    crawlConfig.controlConfig.numTotalPartitions, partitionId);
         } catch (Exception e) {
             LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName());
             throw new IllegalStateException(e);
         }
         this.collector = collector;
         this.runningJobManager = new RunningJobManager(crawlConfig.zkStateConfig.zkQuorum,
-            crawlConfig.zkStateConfig.zkSessionTimeoutMs,
-            crawlConfig.zkStateConfig.zkRetryTimes,
-            crawlConfig.zkStateConfig.zkRetryInterval,
-            crawlConfig.zkStateConfig.zkRoot);
+                crawlConfig.zkStateConfig.zkSessionTimeoutMs,
+                crawlConfig.zkStateConfig.zkRetryTimes,
+                crawlConfig.zkStateConfig.zkRetryInterval,
+                crawlConfig.zkStateConfig.zkRoot,
+                crawlConfig.zkStateConfig.zkLockPath);
         this.lastFinishAppTime = this.runningJobManager.recoverLastFinishedTime(partitionId);
-        if (this.lastFinishAppTime == 0l) {
-            this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000l;//one day ago
+        if (this.lastFinishAppTime == 0L) {
+            this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000L;//one day ago
             this.runningJobManager.updateLastFinishTime(partitionId, this.lastFinishAppTime);
         }
     }
@@ -129,7 +130,7 @@ public class HiveJobFetchSpout extends BaseRichSpout {
             handleApps(apps, true);
 
             long fetchTime = Calendar.getInstance().getTimeInMillis();
-            if (fetchTime - this.lastFinishAppTime > 60000l) {
+            if (fetchTime - this.lastFinishAppTime > 60000L) {
                 apps = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_MR_JOB, Long.toString(this.lastFinishAppTime));
                 if (apps == null) {
                     apps = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index 71f5949..c8b1f61 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -18,6 +18,7 @@ package org.apache.eagle.security.hive.jobrunning;
 
 import backtype.storm.topology.base.BaseRichSpout;
 import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
+import org.apache.eagle.jpm.util.Utils;
 import org.apache.eagle.security.hive.config.RunningJobCrawlConfig;
 import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.ControlConfig;
 import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
@@ -27,19 +28,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HiveJobRunningSourcedStormSpoutProvider {
-	private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
 
-	public BaseRichSpout getSpout(Config config, int parallelism){
-		RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig();
-		String RMEndPoints = config.getString("dataSourceConfig.RMEndPoints");
-		endPointConfig.RMBasePaths = RMEndPoints.split(",");
+    public BaseRichSpout getSpout(Config config, int parallelism) {
+        RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig();
+        String RMEndPoints = config.getString("dataSourceConfig.RMEndPoints");
+        endPointConfig.RMBasePaths = RMEndPoints.split(",");
 
-		String HSEndPoint = config.getString("dataSourceConfig.HSEndPoint");
-		endPointConfig.HSBasePath = HSEndPoint;
+        String HSEndPoint = config.getString("dataSourceConfig.HSEndPoint");
+        endPointConfig.HSBasePath = HSEndPoint;
 
-		ControlConfig controlConfig = new ControlConfig();
-		controlConfig.jobInfoEnabled = true;
-		controlConfig.jobConfigEnabled = true;
+        ControlConfig controlConfig = new ControlConfig();
+        controlConfig.jobInfoEnabled = true;
+        controlConfig.jobConfigEnabled = true;
         controlConfig.numTotalPartitions = parallelism <= 0 ? 1 : parallelism;
 
         boolean zkCleanupTimeSet = config.hasPath("dataSourceConfig.zkCleanupTimeInday");
@@ -56,24 +57,24 @@ public class HiveJobRunningSourcedStormSpoutProvider {
         controlConfig.sizeOfJobCompletedInfoQueue = sizeOfJobCompletedInfoQueue ? config.getInt("dataSourceConfig.sizeOfJobCompletedInfoQueue") : 10000;
 
         //controlConfig.numTotalPartitions = parallelism == null ? 1 : parallelism;
-		ZKStateConfig zkStateConfig = new ZKStateConfig();
-		zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-		zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
-		zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-		zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
-		zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
-		RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
+        ZKStateConfig zkStateConfig = new ZKStateConfig();
+        zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+        zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+        zkStateConfig.zkLockPath = Utils.makeLockPath(zkStateConfig.zkRoot + "/" + config.getString("siteId"));
+        RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
 
-		try{
-			controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>)Class.forName(config.getString("dataSourceConfig.partitionerCls"));
-		}
-		catch(Exception ex){
-			LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
-			//throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
+        try {
+            controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+        } catch (Exception ex) {
+            LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
+            //throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
             controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
         }
 
-		HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig);
-		return spout;
-	}
+        HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig);
+        return spout;
+    }
 }