You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/06 09:06:30 UTC

incubator-eagle git commit: [EAGLE-823] Refactor zkroot and lockpath to solve the problem occured adding zookeeper lock

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 9f4e7633d -> 5da9df822


[EAGLE-823] Refactor zkroot and lockpath to solve the problem occured adding zookeeper lock

EAGLE-823 Refactor zkroot and lockpath to solve the problem occured adding zookeeper lock

- Refactor zkroot and lockpath to uniform structure.
- Add unit test for eagle-security-hive.

https://issues.apache.org/jira/browse/EAGLE-823

Author: chitin <ch...@gmail.com>

Closes #713 from chitin/EAGLE-823.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/5da9df82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/5da9df82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/5da9df82

Branch: refs/heads/master
Commit: 5da9df8221a51252b5e91127d20dea3165553b68
Parents: 9f4e763
Author: chitin <ch...@gmail.com>
Authored: Tue Dec 6 17:06:18 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Dec 6 17:06:18 2016 +0800

----------------------------------------------------------------------
 .../jpm/mr/running/MRRunningJobConfig.java      |  6 +-
 .../jpm/mr/running/parser/MRJobParserTest.java  |  4 +-
 .../spark/running/SparkRunningJobAppConfig.java |  3 +-
 .../java/org/apache/eagle/jpm/util/Utils.java   |  2 +-
 .../org/apache/eagle/jpm/util/UtilsTest.java    |  4 +-
 ...HiveJobRunningSourcedStormSpoutProvider.java |  5 +-
 .../hive/jobrunning/TestHiveJobFetchSpout.java  | 93 ++++++++++++++++++++
 7 files changed, 107 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 119867d..f733b95 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -32,6 +32,8 @@ public class MRRunningJobConfig implements Serializable {
 
     private static final String ZK_ROOT_PREFIX = "/apps/mr/running";
 
+    private static final String JOB_SYMBOL = "/jobs";
+
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -109,8 +111,8 @@ public class MRRunningJobConfig implements Serializable {
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
-        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId");
-        this.zkStateConfig.zkLockPath = Utils.makeLockPath(this.zkStateConfig.zkRoot);
+        this.zkStateConfig.zkLockPath = Utils.makeLockPath(ZK_ROOT_PREFIX + "/" + config.getString("siteId"));
+        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId") + JOB_SYMBOL;
 
         // parse eagle service endpoint
         this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 3a71384..f2e581c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -63,8 +63,8 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 @PrepareForTest({InputStreamUtils.class, MRJobParser.class, URLConnectionUtils.class, Math.class, MRJobEntityCreationHandler.class})
 @PowerMockIgnore({"javax.*", "org.w3c.*", "com.sun.org.apache.xerces.*","org.apache.xerces.*"})
 public class MRJobParserTest {
-    private static final String ZK_JOB_PATH = "/apps/mr/running/sandbox/application_1479206441898_30784/job_1479206441898_30784";
-    private static final String ZK_APP_PATH = "/apps/mr/running/sandbox/application_1479206441898_30784";
+    private static final String ZK_JOB_PATH = "/apps/mr/running/sandbox/jobs/application_1479206441898_30784/job_1479206441898_30784";
+    private static final String ZK_APP_PATH = "/apps/mr/running/sandbox/jobs/application_1479206441898_30784";
     private static final String JOB_CONF_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/conf?anonymous=true";
     private static final String JOB_COUNT_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/counters?anonymous=true";
     private static final String JOB_ID = "job_1479206441898_30784";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index c5ec6ce..b48784f 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -34,6 +34,7 @@ public class SparkRunningJobAppConfig implements Serializable {
     static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
 
     static final String DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT = "/apps/spark/running";
+    static final String JOB_SYMBOL = "/jobs";
 
     ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
@@ -120,8 +121,8 @@ public class SparkRunningJobAppConfig implements Serializable {
         this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT;
         this.zkStateConfig.zkLockPath = Utils.makeLockPath(DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId"));
+        this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId") + JOB_SYMBOL;
         if (config.hasPath("zookeeper.zkRoot")) {
             this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 7fb6643..9025d36 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -116,6 +116,6 @@ public class Utils {
 
     public static String makeLockPath(String zkrootWithSiteId) {
         Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), "zkrootWithSiteId must not be blank");
-        return "/locks" + zkrootWithSiteId.toLowerCase();
+        return zkrootWithSiteId.toLowerCase() + "/locks";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
index 75d46e9..19c0f00 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java
@@ -67,8 +67,8 @@ public class UtilsTest {
 
     @Test
     public void testMakeLockPath() {
-        String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId");
-        Assert.assertEquals("/locks/apps/mr/running/sitdid", lockpath);
+        String lockpath = Utils.makeLockPath("/apps/mr/jobs/sitdId");
+        Assert.assertEquals("/apps/mr/jobs/sitdid/locks", lockpath);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index c8b1f61..597593b 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 public class HiveJobRunningSourcedStormSpoutProvider {
     private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class);
+    private static final String JOB_SYMBOL = "/jobs";
 
     public BaseRichSpout getSpout(Config config, int parallelism) {
         RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig();
@@ -59,11 +60,11 @@ public class HiveJobRunningSourcedStormSpoutProvider {
         //controlConfig.numTotalPartitions = parallelism == null ? 1 : parallelism;
         ZKStateConfig zkStateConfig = new ZKStateConfig();
         zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-        zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+        zkStateConfig.zkLockPath = Utils.makeLockPath(config.getString("dataSourceConfig.zkRoot") + "/" + config.getString("siteId"));
+        zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot") + "/" + config.getString("siteId") + JOB_SYMBOL;
         zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
         zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
         zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
-        zkStateConfig.zkLockPath = Utils.makeLockPath(zkStateConfig.zkRoot + "/" + config.getString("siteId"));
         RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5da9df82/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
new file mode 100644
index 0000000..d674e02
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/test/java/org/apache/eagle/security/hive/jobrunning/TestHiveJobFetchSpout.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.jobrunning;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.junit.*;
+import static org.mockito.Mockito.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Since 12/5/16.
+ */
+public class TestHiveJobFetchSpout {
+
+    private static TestingServer zk;
+    private static com.typesafe.config.Config config;
+    private static CuratorFramework curator;
+    private static final String SHARE_RESOURCES = "/apps/hive/running/sanbox/jobs/0/lastFinishTime";
+
+    @BeforeClass
+    public static void setupZookeeper() throws Exception {
+        zk = new TestingServer();
+        curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+        curator.start();
+        curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(SHARE_RESOURCES);
+        config = ConfigFactory.parseMap(new HashMap<String, Object>() {{
+            put("dataSourceConfig.RMEndPoints", "http://server.eagle.apache.org:8088");
+            put("dataSourceConfig.HSEndPoint", "http://server.eagle.apache.org:19888");
+            put("dataSourceConfig.zkQuorum", zk.getConnectString());
+            put("dataSourceConfig.zkRoot", "/apps/hive/running");
+            put("dataSourceConfig.zkSessionTimeoutMs", 15000);
+            put("dataSourceConfig.zkRetryTimes", 3);
+            put("dataSourceConfig.zkRetryInterval", 2000);
+            put("dataSourceConfig.partitionerCls", "org.apache.eagle.job.DefaultJobPartitionerImpl");
+            put("siteId", "sanbox");
+        }});
+    }
+
+    @AfterClass
+    public static void teardownZookeeper() throws Exception {
+        if(curator.checkExists().forPath(SHARE_RESOURCES) != null) {
+            curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES);
+        }
+        CloseableUtils.closeQuietly(curator);
+        CloseableUtils.closeQuietly(zk);
+    }
+
+    @Before
+    public void setDefaultValues() throws Exception {
+        curator.setData().forPath(SHARE_RESOURCES, String.valueOf(0).getBytes());
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        Map conf = mock(HashMap.class);
+        TopologyContext context = mock(TopologyContext.class);
+        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+        when(context.getThisTaskId()).thenReturn(1);
+        when(context.getComponentTasks(anyString())).thenReturn(new ArrayList<Integer>() {{
+            add(1);
+        }});
+        HiveJobRunningSourcedStormSpoutProvider provider = new HiveJobRunningSourcedStormSpoutProvider();
+        HiveJobFetchSpout spout = (HiveJobFetchSpout)provider.getSpout(config, 1);
+        spout.open(conf, context, collector);
+        Long yesterday = Long.valueOf(new String(curator.getData().forPath(SHARE_RESOURCES)));
+        Assert.assertTrue(System.currentTimeMillis() > yesterday);
+    }
+}