You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/25 13:52:49 UTC
incubator-eagle git commit: [EAGLE-800] Use InterProcessMutex to sync
operation in RunningJobManager
Repository: incubator-eagle
Updated Branches:
refs/heads/master 6a0529e96 -> 4d4d8c0ea
[EAGLE-800] Use InterProcessMutex to sync operation in RunningJobManager
- Use InterProcessMutex to sync operation in RunningJobManager.
- Use siteId to generate lockpath for InterProcessMutex.
- Fix some checkstyle problem.
https://issues.apache.org/jira/browse/EAGLE-800
Author: r7raul1984 <ta...@yhd.com>
Closes #684 from r7raul1984/EAGLE-800.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4d4d8c0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4d4d8c0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4d4d8c0e
Branch: refs/heads/master
Commit: 4d4d8c0eafa06d537a307291df03f344cdae6ef5
Parents: 6a0529e
Author: r7raul1984 <ta...@yhd.com>
Authored: Fri Nov 25 21:52:40 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Fri Nov 25 21:52:40 2016 +0800
----------------------------------------------------------------------
.../impl/storm/zookeeper/ZKStateConfig.java | 1 +
.../jpm/mr/running/MRRunningJobConfig.java | 3 +
.../mr/running/recover/MRRunningJobManager.java | 2 +-
.../mr/running/storm/MRRunningJobParseBolt.java | 6 +-
.../mr/running/MRRunningJobApplicationTest.java | 15 ++-
.../jpm/mr/running/MRRunningJobManagerTest.java | 120 +++++++++++++++++++
.../jpm/mr/running/parser/MRJobParserTest.java | 2 +
.../src/test/resources/mrconf_30784.xml | 17 ++-
.../spark/running/SparkRunningJobAppConfig.java | 3 +
.../running/recover/SparkRunningJobManager.java | 2 +-
.../running/storm/SparkRunningJobParseBolt.java | 14 ++-
.../java/org/apache/eagle/jpm/util/Utils.java | 12 +-
.../jpm/util/jobrecover/RunningJobManager.java | 35 +++---
.../org/apache/eagle/jpm/util/UtilsTest.java | 19 +++
.../hive/HiveQueryMonitoringApplication.java | 14 +--
.../hive/config/RunningJobCrawlConfig.java | 8 +-
.../hive/jobrunning/HiveJobFetchSpout.java | 21 ++--
...HiveJobRunningSourcedStormSpoutProvider.java | 53 ++++----
18 files changed, 266 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
index f9515f5..53df455 100644
--- a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
+++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
@@ -25,4 +25,5 @@ public class ZKStateConfig implements Serializable {
public int zkSessionTimeoutMs;
public int zkRetryTimes;
public int zkRetryInterval;
+ public String zkLockPath;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 975e821..119867d 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running;
import org.apache.eagle.common.config.ConfigOptionParser;
import com.typesafe.config.Config;
+import org.apache.eagle.jpm.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class MRRunningJobConfig implements Serializable {
public int zkSessionTimeoutMs;
public int zkRetryTimes;
public int zkRetryInterval;
+ public String zkLockPath;
}
public static class EagleServiceConfig implements Serializable {
@@ -108,6 +110,7 @@ public class MRRunningJobConfig implements Serializable {
this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId");
+ this.zkStateConfig.zkLockPath = Utils.makeLockPath(this.zkStateConfig.zkRoot);
// parse eagle service endpoint
this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
index 20a8701..70e6fda 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -34,7 +34,7 @@ public class MRRunningJobManager implements Serializable {
public MRRunningJobManager(MRRunningJobConfig.ZKStateConfig config) {
this.runningJobManager = new RunningJobManager(config.zkQuorum,
- config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+ config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot, config.zkLockPath);
}
public Map<String, JobExecutionAPIEntity> recoverYarnApp(String appId) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index e2767d8..8ec2dec 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -75,8 +75,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- AppInfo appInfo = (AppInfo)tuple.getValue(1);
- Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>)tuple.getValue(2);
+ AppInfo appInfo = (AppInfo) tuple.getValue(1);
+ Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>) tuple.getValue(2);
LOG.info("get mr yarn application " + appInfo.getId());
@@ -100,7 +100,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
});
if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
- || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
+ || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
applicationParser.setStatus(MRJobParser.ParserStatus.RUNNING);
executorService.execute(applicationParser);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index 5d78a50..787c9ac 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -58,15 +58,18 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
@PowerMockIgnore({"javax.*"})
public class MRRunningJobApplicationTest {
- public static final String RM_URL = "http://sandbox.hortonworks.com:50030/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true";
- public static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]";
- public static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]";
- public static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]";
+ private static final String RM_URL = "http://sandbox.hortonworks.com:50030/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true";
+ private static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]";
+ private static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]";
+ private static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]";
private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+ private static Config config = ConfigFactory.load();
+ private static String siteId;
@BeforeClass
public static void setupMapper() throws Exception {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ siteId = config.getString("siteId");
}
@@ -77,8 +80,6 @@ public class MRRunningJobApplicationTest {
when(Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
- MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
- PowerMockito.whenNew(MRRunningJobManager.class).withArguments(any()).thenReturn(mrRunningJobManager);
Config config = ConfigFactory.load();
MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
@@ -88,6 +89,8 @@ public class MRRunningJobApplicationTest {
mrRunningJobConfig.getZkStateConfig(),
confKeyKeys,
config);
+ MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
+ PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager);
mrRunningJobParseBolt.prepare(null, null, null);
InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
new file mode 100644
index 0000000..55f76e2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
+import org.apache.zookeeper.CreateMode;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({MRRunningJobManager.class, RunningJobManager.class, LoggerFactory.class})
+@PowerMockIgnore({"javax.*"})
+public class MRRunningJobManagerTest {
+ private static TestingServer zk;
+ private static com.typesafe.config.Config config = ConfigFactory.load();
+ private static CuratorFramework curator;
+ private static final String SHARE_RESOURCES = "/apps/mr/running/sandbox/yarnAppId/jobId";
+ private static final int QTY = 5;
+ private static final int REPETITIONS = QTY * 10;
+ private static MRRunningJobConfig.EndpointConfig endpointConfig;
+ private static MRRunningJobConfig.ZKStateConfig zkStateConfig;
+ private static org.slf4j.Logger log = mock(org.slf4j.Logger.class);
+
+ @BeforeClass
+ public static void setupZookeeper() throws Exception {
+ zk = new TestingServer();
+ curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+ curator.start();
+ curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(SHARE_RESOURCES);
+ MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
+ zkStateConfig = mrRunningJobConfig.getZkStateConfig();
+ zkStateConfig.zkQuorum = zk.getConnectString();
+ endpointConfig = mrRunningJobConfig.getEndpointConfig();
+ mockStatic(LoggerFactory.class);
+ when(LoggerFactory.getLogger(any(Class.class))).thenReturn(log);
+ }
+
+ @AfterClass
+ public static void teardownZookeeper() throws Exception {
+ if (curator.checkExists().forPath(SHARE_RESOURCES) != null) {
+ curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
+ }
+ CloseableUtils.closeQuietly(curator);
+ CloseableUtils.closeQuietly(zk);
+ }
+
+ @Test
+ public void testMRRunningJobManagerDelWithLock() throws Exception {
+ Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
+
+ ExecutorService service = Executors.newFixedThreadPool(QTY);
+ for (int i = 0; i < QTY; ++i) {
+ Callable<Void> task = () -> {
+ try {
+ MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
+ for (int j = 0; j < REPETITIONS; ++j) {
+ mrRunningJobManager.delete("yarnAppId", "jobId");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ // log or do something
+ }
+ return null;
+ };
+ service.submit(task);
+ }
+
+ service.shutdown();
+ service.awaitTermination(10, TimeUnit.MINUTES);
+ Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) == null);
+ verify(log, never()).error(anyString(), anyString(), anyString(), anyString(), any(Throwable.class));
+ verify(log, never()).error(anyString(), anyString(), anyString());
+ verify(log, never()).error(anyString(), any(Throwable.class));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 4b00bb2..561d858 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -72,6 +72,7 @@ public class MRJobParserTest {
private static final String DATA_FROM_ZK = "{\"entityTags\":\"{\\\"jobName\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"jobId\\\":\\\"job_1479206441898_30784\\\",\\\"site\\\":\\\"sandbox\\\",\\\"jobDefId\\\":\\\"eagletest\\\",\\\"jobType\\\":\\\"HIVE\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\"}\",\"appInfo\":\"{\\\"applicationType\\\":\\\"MAPREDUCE\\\",\\\"startedTime\\\":\\\"1479328221694\\\",\\\"finalStatus\\\":\\\"UNDEFINED\\\",\\\"trackingUrl\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/proxy\\\\\\/application_1479206441898_30784\\\\\\/\\\",\\\"runningContainers\\\":\\\"2\\\",\\\"trackingUI\\\":\\\"ApplicationMaster\\\",\\\"clusterId\\\":\\\"1479206441898\\\",\\\"amContainerLogs\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/node\\\\\\/containerlogs\\\\\\/container_e11_1479206441898_30784_01_000001\\\\\\/xxx\\\",\\\"allocatedVCores\\\":\\\"2\\\",\\\"diagnostics\\\":\\\"\\\",\\\
"name\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"progress\\\":\\\"95.0\\\",\\\"finishedTime\\\":\\\"0\\\",\\\"allocatedMB\\\":\\\"3072\\\",\\\"id\\\":\\\"application_1479206441898_30784\\\",\\\"state\\\":\\\"RUNNING\\\",\\\"amHostHttpAddress\\\":\\\"host.domain.com:8088\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\",\\\"elapsedTime\\\":\\\"13367402\\\"}\"}";
private static TestingServer zk;
private static String ZKROOT;
+ private static String siteId;
private static MRRunningJobConfig mrRunningJobConfig;
private static Config config = ConfigFactory.load();
private static CuratorFramework curator;
@@ -83,6 +84,7 @@ public class MRJobParserTest {
zk = new TestingServer();
curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new RetryOneTime(1));
mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
+ siteId = mrRunningJobConfig.getEndpointConfig().site;
mrRunningJobConfig.getZkStateConfig().zkQuorum = zk.getConnectString();
ZKROOT = mrRunningJobConfig.getZkStateConfig().zkRoot;
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
index 78d61b5..66da734 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
@@ -1 +1,16 @@
-\ufeff<conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.
xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf>
\ No newline at end of file
+\ufeff<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+--><conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.
xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index 3ae4a35..c5ec6ce 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -20,6 +20,7 @@ package org.apache.eagle.jpm.spark.running;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.Config;
+import org.apache.eagle.jpm.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class SparkRunningJobAppConfig implements Serializable {
public int zkRetryTimes;
public int zkRetryInterval;
public boolean recoverEnabled;
+ public String zkLockPath;
}
public static class EagleServiceConfig implements Serializable {
@@ -119,6 +121,7 @@ public class SparkRunningJobAppConfig implements Serializable {
this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT;
+ this.zkStateConfig.zkLockPath = Utils.makeLockPath(DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId"));
if (config.hasPath("zookeeper.zkRoot")) {
this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
index 3fb6371..4fbf53b 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -33,7 +33,7 @@ public class SparkRunningJobManager implements Serializable {
public SparkRunningJobManager(SparkRunningJobAppConfig.ZKStateConfig config) {
this.runningJobManager = new RunningJobManager(config.zkQuorum,
- config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+ config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot, config.zkLockPath);
}
public Map<String, SparkAppEntity> recoverYarnApp(String appId) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index 9c0ffef..0a74348 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -74,14 +74,20 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
- AppInfo appInfo = (AppInfo)tuple.getValue(1);
- Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>)tuple.getValue(2);
+ AppInfo appInfo = (AppInfo) tuple.getValue(1);
+ Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>) tuple.getValue(2);
LOG.info("get spark yarn application " + appInfo.getId());
SparkApplicationParser applicationParser;
if (!runningSparkParsers.containsKey(appInfo.getId())) {
- applicationParser = new SparkApplicationParser(eagleServiceConfig, endpointConfig, jobExtractorConfig, appInfo, sparkApp, new SparkRunningJobManager(zkStateConfig), resourceFetcher);
+ applicationParser = new SparkApplicationParser(eagleServiceConfig,
+ endpointConfig,
+ jobExtractorConfig,
+ appInfo,
+ sparkApp,
+ new SparkRunningJobManager(zkStateConfig),
+ resourceFetcher);
runningSparkParsers.put(appInfo.getId(), applicationParser);
LOG.info("create application parser for {}", appInfo.getId());
} else {
@@ -97,7 +103,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
});
if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
- || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
+ || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING);
executorService.execute(applicationParser);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 91077df..9025d36 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -18,6 +18,8 @@
package org.apache.eagle.jpm.util;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,7 +29,6 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
public class Utils {
@@ -86,11 +87,11 @@ public class Utils {
int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB;
}
- LOG.warn("Cannot parse memory info " + memory);
+ LOG.warn("Cannot parse memory info " + memory);
return 0L;
}
-
+
public static Constants.JobType fetchJobType(Map config) {
if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
return Constants.JobType.CASCADING;
@@ -112,4 +113,9 @@ public class Utils {
config.forEach(entry -> mapConfig.put(entry.getKey(), entry.getValue()));
return fetchJobType(mapConfig);
}
+
+ public static String makeLockPath(String zkrootWithSiteId) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), "zkrootWithSiteId must not be blank");
+ return zkrootWithSiteId.toLowerCase() + "/locks";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index a2d97bf..1857707 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -18,6 +18,8 @@
package org.apache.eagle.jpm.util.jobrecover;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
@@ -39,8 +41,9 @@ public class RunningJobManager implements Serializable {
private static final String ENTITY_TAGS_KEY = "entityTags";
private static final String APP_INFO_KEY = "appInfo";
private static final String ZNODE_LAST_FINISH_TIME = "lastFinishTime";
+ private final InterProcessMutex lock;
- private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) throws Exception {
+ private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) {
return CuratorFrameworkFactory.newClient(
zkQuorum,
zkSessionTimeoutMs,
@@ -49,12 +52,17 @@ public class RunningJobManager implements Serializable {
);
}
- public RunningJobManager(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval, String zkRoot) {
+ public RunningJobManager(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval, String zkRoot, String lockPath) {
this.zkRoot = zkRoot;
-
+ curator = newCurator(zkQuorum, zkSessionTimeoutMs, zkRetryTimes, zkRetryInterval);
try {
- curator = newCurator(zkQuorum, zkSessionTimeoutMs, zkRetryTimes, zkRetryInterval);
curator.start();
+ } catch (Exception e) {
+ LOG.error("curator start error {}", e);
+ }
+ LOG.info("InterProcessMutex lock path is " + lockPath);
+ lock = new InterProcessMutex(curator, lockPath);
+ try {
if (curator.checkExists().forPath(this.zkRoot) == null) {
curator.create()
.creatingParentsIfNeeded()
@@ -142,7 +150,6 @@ public class RunningJobManager implements Serializable {
public boolean update(String yarnAppId, String jobId, Map<String, String> tags, AppInfo app) {
String path = this.zkRoot + "/" + yarnAppId + "/" + jobId;
- //InterProcessMutex lock = new InterProcessMutex(curator, path);
Map<String, String> appInfo = new HashMap<>();
appInfo.put("id", app.getId());
appInfo.put("user", app.getUser());
@@ -169,7 +176,7 @@ public class RunningJobManager implements Serializable {
fields.put(ENTITY_TAGS_KEY, (new JSONObject(tags)).toString());
fields.put(APP_INFO_KEY, (new JSONObject(appInfo)).toString());
try {
- //lock.acquire();
+ lock.acquire();
JSONObject object = new JSONObject(fields);
if (curator.checkExists().forPath(path) == null) {
curator.create()
@@ -183,7 +190,7 @@ public class RunningJobManager implements Serializable {
LOG.error("failed to update job {} for yarn app {} ", jobId, yarnAppId);
} finally {
try {
- //lock.release();
+ lock.release();
} catch (Exception e) {
LOG.error("fail releasing lock", e);
}
@@ -193,9 +200,8 @@ public class RunningJobManager implements Serializable {
public void delete(String yarnAppId, String jobId) {
String path = this.zkRoot + "/" + yarnAppId + "/" + jobId;
- //InterProcessMutex lock = new InterProcessMutex(curator, path);
try {
- //lock.acquire();
+ lock.acquire();
if (curator.checkExists().forPath(path) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(path);
LOG.info("delete job {} for yarn app {}, path {} ", jobId, yarnAppId, path);
@@ -208,7 +214,7 @@ public class RunningJobManager implements Serializable {
LOG.error("failed to delete job {} for yarn app {}, path {}, {}", jobId, yarnAppId, path, e);
} finally {
try {
- //lock.release();
+ lock.release();
} catch (Exception e) {
LOG.error("fail releasing lock", e);
@@ -218,9 +224,8 @@ public class RunningJobManager implements Serializable {
public void delete(String yarnAppId) {
String path = this.zkRoot + "/" + yarnAppId;
- //InterProcessMutex lock = new InterProcessMutex(curator, path);
try {
- //lock.acquire();
+ lock.acquire();
if (curator.checkExists().forPath(path) != null) {
curator.delete().forPath(path);
LOG.info("delete yarn app {}, path {} ", yarnAppId, path);
@@ -229,7 +234,7 @@ public class RunningJobManager implements Serializable {
LOG.error("failed to delete yarn app {}, path {} ", yarnAppId, path);
} finally {
try {
- //lock.release();
+ lock.release();
} catch (Exception e) {
LOG.error("fail releasing lock", e);
}
@@ -243,14 +248,14 @@ public class RunningJobManager implements Serializable {
while (keysItr.hasNext()) {
String key = keysItr.next();
result.put(key, new HashMap<>());
- String value = (String)object.get(key);
+ String value = (String) object.get(key);
JSONObject jsonObject = new JSONObject(value);
Map<String, String> items = result.get(key);
Iterator<String> keyItemItr = jsonObject.keys();
while (keyItemItr.hasNext()) {
String itemKey = keyItemItr.next();
- items.put(itemKey, (String)jsonObject.get(itemKey));
+ items.put(itemKey, (String) jsonObject.get(itemKey));
}
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
index b29a8e0..8e89edf 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
@@ -64,4 +64,23 @@ public class UtilsTest {
thrown.expect(IllegalArgumentException.class);
Utils.parseMemory("0.1g");
}
+
+ @Test
+ public void testMakeLockPath() {
+ String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId");
+ Assert.assertEquals("/apps/mr/running/sitdid/locks", lockpath);
+ }
+
+ @Test
+ public void testMakeLockPath1() {
+ thrown.expect(IllegalArgumentException.class);
+ Utils.makeLockPath("");
+ }
+
+ @Test
+ public void testMakeLockPath2() {
+ thrown.expect(IllegalArgumentException.class);
+ Utils.makeLockPath(null);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
index 6abea3d..7a4509b 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
@@ -36,11 +36,11 @@ import org.apache.eagle.security.hive.sensitivity.HiveSensitivityDataEnrichBolt;
* Since 8/11/16.
*/
public class HiveQueryMonitoringApplication extends StormApplication {
- public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
- public final static String FILTER_TASK_NUM = "topology.numOfFilterTasks";
- public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
- public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
- public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+ private static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ private static final String FILTER_TASK_NUM = "topology.numOfFilterTasks";
+ private static final String PARSER_TASK_NUM = "topology.numOfParserTasks";
+ private static final String JOIN_TASK_NUM = "topology.numOfJoinTasks";
+ private static final String SINK_TASK_NUM = "topology.numOfSinkTasks";
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
@@ -68,13 +68,13 @@ public class HiveQueryMonitoringApplication extends StormApplication {
BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("user"));
- StormStreamSink sinkBolt = environment.getStreamSink("hive_query_stream",config);
+ StormStreamSink sinkBolt = environment.getStreamSink("hive_query_stream", config);
BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
return builder.createTopology();
}
- public static void main(String[] args){
+ public static void main(String[] args) {
Config config = ConfigFactory.load();
HiveQueryMonitoringApplication app = new HiveQueryMonitoringApplication();
app.run(config);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
index 2662698..35df281 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
@@ -21,25 +21,25 @@ import org.apache.eagle.jpm.util.JobIdPartitioner;
import java.io.Serializable;
-public class RunningJobCrawlConfig implements Serializable{
+public class RunningJobCrawlConfig implements Serializable {
private static final long serialVersionUID = 1L;
public RunningJobEndpointConfig endPointConfig;
public ControlConfig controlConfig;
public ZKStateConfig zkStateConfig;
- public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){
+ public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig) {
this.endPointConfig = endPointConfig;
this.controlConfig = controlConfig;
this.zkStateConfig = zkStateConfig;
}
- public static class RunningJobEndpointConfig implements Serializable{
+ public static class RunningJobEndpointConfig implements Serializable {
private static final long serialVersionUID = 1L;
public String[] RMBasePaths;
public String HSBasePath;
}
- public static class ControlConfig implements Serializable{
+ public static class ControlConfig implements Serializable {
private static final long serialVersionUID = 1L;
public boolean jobConfigEnabled;
public boolean jobInfoEnabled;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
index af4599b..5f54f30 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
@@ -94,26 +94,27 @@ public class HiveJobFetchSpout extends BaseRichSpout {
this.partitionId = calculatePartitionId(context);
// sanity verify 0<=partitionId<=numTotalPartitions-1
if (partitionId < 0 || partitionId > crawlConfig.controlConfig.numTotalPartitions) {
- throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
- partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions);
+ throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
+ + partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions);
}
Class<? extends JobIdPartitioner> partitionerCls = crawlConfig.controlConfig.partitionerCls;
try {
this.jobFilter = new JobIdFilterByPartition(partitionerCls.newInstance(),
- crawlConfig.controlConfig.numTotalPartitions, partitionId);
+ crawlConfig.controlConfig.numTotalPartitions, partitionId);
} catch (Exception e) {
LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName());
throw new IllegalStateException(e);
}
this.collector = collector;
this.runningJobManager = new RunningJobManager(crawlConfig.zkStateConfig.zkQuorum,
- crawlConfig.zkStateConfig.zkSessionTimeoutMs,
- crawlConfig.zkStateConfig.zkRetryTimes,
- crawlConfig.zkStateConfig.zkRetryInterval,
- crawlConfig.zkStateConfig.zkRoot);
+ crawlConfig.zkStateConfig.zkSessionTimeoutMs,
+ crawlConfig.zkStateConfig.zkRetryTimes,
+ crawlConfig.zkStateConfig.zkRetryInterval,
+ crawlConfig.zkStateConfig.zkRoot,
+ crawlConfig.zkStateConfig.zkLockPath);
this.lastFinishAppTime = this.runningJobManager.recoverLastFinishedTime(partitionId);
- if (this.lastFinishAppTime == 0l) {
- this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000l;//one day ago
+ if (this.lastFinishAppTime == 0L) {
+ this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000L;//one day ago
this.runningJobManager.updateLastFinishTime(partitionId, this.lastFinishAppTime);
}
}
@@ -129,7 +130,7 @@ public class HiveJobFetchSpout extends BaseRichSpout {
handleApps(apps, true);
long fetchTime = Calendar.getInstance().getTimeInMillis();
- if (fetchTime - this.lastFinishAppTime > 60000l) {
+ if (fetchTime - this.lastFinishAppTime > 60000L) {
apps = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_MR_JOB, Long.toString(this.lastFinishAppTime));
if (apps == null) {
apps = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index 71f5949..c8b1f61 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -18,6 +18,7 @@ package org.apache.eagle.security.hive.jobrunning;
import backtype.storm.topology.base.BaseRichSpout;
import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
+import org.apache.eagle.jpm.util.Utils;
import org.apache.eagle.security.hive.config.RunningJobCrawlConfig;
import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.ControlConfig;
import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
@@ -27,19 +28,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveJobRunningSourcedStormSpoutProvider {
- private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
- public BaseRichSpout getSpout(Config config, int parallelism){
- RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig();
- String RMEndPoints = config.getString("dataSourceConfig.RMEndPoints");
- endPointConfig.RMBasePaths = RMEndPoints.split(",");
+ public BaseRichSpout getSpout(Config config, int parallelism) {
+ RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig();
+ String RMEndPoints = config.getString("dataSourceConfig.RMEndPoints");
+ endPointConfig.RMBasePaths = RMEndPoints.split(",");
- String HSEndPoint = config.getString("dataSourceConfig.HSEndPoint");
- endPointConfig.HSBasePath = HSEndPoint;
+ String HSEndPoint = config.getString("dataSourceConfig.HSEndPoint");
+ endPointConfig.HSBasePath = HSEndPoint;
- ControlConfig controlConfig = new ControlConfig();
- controlConfig.jobInfoEnabled = true;
- controlConfig.jobConfigEnabled = true;
+ ControlConfig controlConfig = new ControlConfig();
+ controlConfig.jobInfoEnabled = true;
+ controlConfig.jobConfigEnabled = true;
controlConfig.numTotalPartitions = parallelism <= 0 ? 1 : parallelism;
boolean zkCleanupTimeSet = config.hasPath("dataSourceConfig.zkCleanupTimeInday");
@@ -56,24 +57,24 @@ public class HiveJobRunningSourcedStormSpoutProvider {
controlConfig.sizeOfJobCompletedInfoQueue = sizeOfJobCompletedInfoQueue ? config.getInt("dataSourceConfig.sizeOfJobCompletedInfoQueue") : 10000;
//controlConfig.numTotalPartitions = parallelism == null ? 1 : parallelism;
- ZKStateConfig zkStateConfig = new ZKStateConfig();
- zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
- zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
- zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
- zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
- zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
- RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
+ ZKStateConfig zkStateConfig = new ZKStateConfig();
+ zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+ zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+ zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+ zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+ zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+ zkStateConfig.zkLockPath = Utils.makeLockPath(zkStateConfig.zkRoot + "/" + config.getString("siteId"));
+ RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
- try{
- controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>)Class.forName(config.getString("dataSourceConfig.partitionerCls"));
- }
- catch(Exception ex){
- LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
- //throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
+ try {
+ controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+ } catch (Exception ex) {
+ LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
+ //throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
}
- HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig);
- return spout;
- }
+ HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig);
+ return spout;
+ }
}