You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/06 09:06:30 UTC
incubator-eagle git commit: [EAGLE-823] Refactor zkroot and lockpath
to solve the problem occured adding zookeeper lock
Repository: incubator-eagle
Updated Branches:
refs/heads/master 9f4e7633d -> 5da9df822
[EAGLE-823] Refactor zkroot and lockpath to solve the problem occured adding zookeeper lock
EAGLE-823 Refactor zkroot and lockpath to solve the problem occured adding zookeeper lock
- Refactor zkroot and lockpath to uniform structure.
- Add unit test for eagle-security-hive.
https://issues.apache.org/jira/browse/EAGLE-823
Author: chitin <ch...@gmail.com>
Closes #713 from chitin/EAGLE-823.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/5da9df82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/5da9df82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/5da9df82
Branch: refs/heads/master
Commit: 5da9df8221a51252b5e91127d20dea3165553b68
Parents: 9f4e763
Author: chitin <ch...@gmail.com>
Authored: Tue Dec 6 17:06:18 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Dec 6 17:06:18 2016 +0800
----------------------------------------------------------------------
.../jpm/mr/running/MRRunningJobConfig.java | 6 +-
.../jpm/mr/running/parser/MRJobParserTest.java | 4 +-
.../spark/running/SparkRunningJobAppConfig.java | 3 +-
.../java/org/apache/eagle/jpm/util/Utils.java | 2 +-
.../org/apache/eagle/jpm/util/UtilsTest.java | 4 +-
...HiveJobRunningSourcedStormSpoutProvider.java | 5 +-
.../hive/jobrunning/TestHiveJobFetchSpout.java | 93 ++++++++++++++++++++
7 files changed, 107 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 119867d..f733b95 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -32,6 +32,8 @@ public class MRRunningJobConfig implements Serializable {
private static final String ZK_ROOT_PREFIX = "/apps/mr/running";
+ private static final String JOB_SYMBOL = "/jobs";
+
public ZKStateConfig getZkStateConfig() {
return zkStateConfig;
}
@@ -109,8 +111,8 @@ public class MRRunningJobConfig implements Serializable {
this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
- this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId");
- this.zkStateConfig.zkLockPath = Utils.makeLockPath(this.zkStateConfig.zkRoot);
+ this.zkStateConfig.zkLockPath = Utils.makeLockPath(ZK_ROOT_PREFIX + "/" + config.getString("siteId"));
+ this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId") + JOB_SYMBOL;
// parse eagle service endpoint
this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 3a71384..f2e581c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -63,8 +63,8 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
@PrepareForTest({InputStreamUtils.class, MRJobParser.class, URLConnectionUtils.class, Math.class, MRJobEntityCreationHandler.class})
@PowerMockIgnore({"javax.*", "org.w3c.*", "com.sun.org.apache.xerces.*","org.apache.xerces.*"})
public class MRJobParserTest {
- private static final String ZK_JOB_PATH = "/apps/mr/running/sandbox/application_1479206441898_30784/job_1479206441898_30784";
- private static final String ZK_APP_PATH = "/apps/mr/running/sandbox/application_1479206441898_30784";
+ private static final String ZK_JOB_PATH = "/apps/mr/running/sandbox/jobs/application_1479206441898_30784/job_1479206441898_30784";
+ private static final String ZK_APP_PATH = "/apps/mr/running/sandbox/jobs/application_1479206441898_30784";
private static final String JOB_CONF_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/conf?anonymous=true";
private static final String JOB_COUNT_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/counters?anonymous=true";
private static final String JOB_ID = "job_1479206441898_30784";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index c5ec6ce..b48784f 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -34,6 +34,7 @@ public class SparkRunningJobAppConfig implements Serializable {
static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
static final String DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT = "/apps/spark/running";
+ static final String JOB_SYMBOL = "/jobs";
ZKStateConfig getZkStateConfig() {
return zkStateConfig;
@@ -120,8 +121,8 @@ public class SparkRunningJobAppConfig implements Serializable {
this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
- this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT;
this.zkStateConfig.zkLockPath = Utils.makeLockPath(DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId"));
+ this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId") + JOB_SYMBOL;
if (config.hasPath("zookeeper.zkRoot")) {
this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 7fb6643..9025d36 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -116,6 +116,6 @@ public class Utils {
public static String makeLockPath(String zkrootWithSiteId) {
Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), "zkrootWithSiteId must not be blank");
- return "/locks" + zkrootWithSiteId.toLowerCase();
+ return zkrootWithSiteId.toLowerCase() + "/locks";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
index 75d46e9..19c0f00 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
@@ -67,8 +67,8 @@ public class UtilsTest {
@Test
public void testMakeLockPath() {
- String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId");
- Assert.assertEquals("/locks/apps/mr/running/sitdid", lockpath);
+ String lockpath = Utils.makeLockPath("/apps/mr/jobs/sitdId");
+ Assert.assertEquals("/apps/mr/jobs/sitdid/locks", lockpath);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index c8b1f61..597593b 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
public class HiveJobRunningSourcedStormSpoutProvider {
private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
+ private static final String JOB_SYMBOL = "/jobs";
public BaseRichSpout getSpout(Config config, int parallelism) {
RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig();
@@ -59,11 +60,11 @@ public class HiveJobRunningSourcedStormSpoutProvider {
//controlConfig.numTotalPartitions = parallelism == null ? 1 : parallelism;
ZKStateConfig zkStateConfig = new ZKStateConfig();
zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
- zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+ zkStateConfig.zkLockPath = Utils.makeLockPath(config.getString("dataSourceConfig.zkRoot") + "/" + config.getString("siteId"));
+ zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot") + "/" + config.getString("siteId") + JOB_SYMBOL;
zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
- zkStateConfig.zkLockPath = Utils.makeLockPath(zkStateConfig.zkRoot + "/" + config.getString("siteId"));
RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
new file mode 100644
index 0000000..d674e02
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.jobrunning;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.junit.*;
+import static org.mockito.Mockito.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Since 12/5/16.
+ */
+public class TestHiveJobFetchSpout {
+
+ private static TestingServer zk;
+ private static com.typesafe.config.Config config;
+ private static CuratorFramework curator;
+ private static final String SHARE_RESOURCES = "/apps/hive/running/sanbox/jobs/0/lastFinishTime";
+
+ @BeforeClass
+ public static void setupZookeeper() throws Exception {
+ zk = new TestingServer();
+ curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+ curator.start();
+ curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(SHARE_RESOURCES);
+ config = ConfigFactory.parseMap(new HashMap<String, Object>() {{
+ put("dataSourceConfig.RMEndPoints", "http://server.eagle.apache.org:8088");
+ put("dataSourceConfig.HSEndPoint", "http://server.eagle.apache.org:19888");
+ put("dataSourceConfig.zkQuorum", zk.getConnectString());
+ put("dataSourceConfig.zkRoot", "/apps/hive/running");
+ put("dataSourceConfig.zkSessionTimeoutMs", 15000);
+ put("dataSourceConfig.zkRetryTimes", 3);
+ put("dataSourceConfig.zkRetryInterval", 2000);
+ put("dataSourceConfig.partitionerCls", "org.apache.eagle.job.DefaultJobPartitionerImpl");
+ put("siteId", "sanbox");
+ }});
+ }
+
+ @AfterClass
+ public static void teardownZookeeper() throws Exception {
+ if(curator.checkExists().forPath(SHARE_RESOURCES) != null) {
+ curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
+ }
+ CloseableUtils.closeQuietly(curator);
+ CloseableUtils.closeQuietly(zk);
+ }
+
+ @Before
+ public void setDefaultValues() throws Exception {
+ curator.setData().forPath(SHARE_RESOURCES, String.valueOf(0).getBytes());
+ }
+
+ @Test
+ public void testOpen() throws Exception {
+ Map conf = mock(HashMap.class);
+ TopologyContext context = mock(TopologyContext.class);
+ SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ when(context.getThisTaskId()).thenReturn(1);
+ when(context.getComponentTasks(anyString())).thenReturn(new ArrayList<Integer>() {{
+ add(1);
+ }});
+ HiveJobRunningSourcedStormSpoutProvider provider = new HiveJobRunningSourcedStormSpoutProvider();
+ HiveJobFetchSpout spout = (HiveJobFetchSpout)provider.getSpout(config, 1);
+ spout.open(conf, context, collector);
+ Long yesterday = Long.valueOf(new String(curator.getData().forPath(SHARE_RESOURCES)));
+ Assert.assertTrue(System.currentTimeMillis() > yesterday);
+ }
+}