You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/15 09:44:55 UTC

incubator-eagle git commit: [EAGLE-770] remove InterProcessMutex from JobHistoryZKStateManager

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 8bb32c012 -> 446c22720


[EAGLE-770] remove InterProcessMutex from JobHistoryZKStateManager

Author: wujinhu <wu...@126.com>

Closes #655 from wujinhu/EAGLE-770.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/446c2272
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/446c2272
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/446c2272

Branch: refs/heads/master
Commit: 446c22720d930bc832fc5f3e32addf72115429a4
Parents: 8bb32c0
Author: wujinhu <wu...@126.com>
Authored: Tue Nov 15 17:44:47 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 15 17:44:47 2016 +0800

----------------------------------------------------------------------
 .../history/crawler/JHFCrawlerDriverImpl.java   |   6 +-
 .../jpm/mr/history/storm/JobHistorySpout.java   |   2 +-
 .../mr/history/zkres/JobHistoryZKStateLCM.java  |  47 ------
 .../history/zkres/JobHistoryZKStateManager.java | 143 +++++--------------
 4 files changed, 42 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 9c9374d..2c9dc8f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -250,9 +250,11 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
     private void clearProcessedJob(Calendar cal) {
         // clear all already processed jobs some days before current processing date (PROCESSED_JOB_KEEP_DAYS)
         cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS);
-        String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
+        String date = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
                 cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH));
-        JobHistoryZKStateManager.instance().truncateProcessedJob(line);
+        if (partitionId == 0) {
+            JobHistoryZKStateManager.instance().truncateProcessedJob(date);
+        }
     }
 
     private boolean isToday() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 436dbeb..9743ea7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -139,7 +139,7 @@ public class JobHistorySpout extends BaseRichSpout {
         }
         JobIdFilter jobIdFilter = new JobIdFilterByPartition(new DefaultJobIdPartitioner(), numTotalPartitions, partitionId);
         JobHistoryZKStateManager.instance().init(appConfig.getZkStateConfig());
-        JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
+        JobHistoryZKStateManager.instance().ensureJobPartition(partitionId, numTotalPartitions);
         interceptor.setSpoutOutputCollector(collector);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
deleted file mode 100644
index 0adcffe..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.zkres;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.List;
-
-public interface JobHistoryZKStateLCM {
-    void ensureJobPartitions(int numTotalPartitions);
-
-    String readProcessedDate(int partitionId);
-
-    List<String> readProcessedJobs(String date);
-
-    void updateProcessedDate(int partitionId, String date);
-
-    void addProcessedJob(String date, String jobId);
-
-    void updateProcessedJob(String date, String jobId, String status);
-
-    void truncateProcessedJob(String date);
-
-    void truncateEverything();
-
-    long readProcessedTimeStamp(int partitionId);
-
-    void updateProcessedTimeStamp(int partitionId, long timeStamp);
-
-    List<Pair<String, String>> getProcessedJobs(String date);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 78a906a..e063301 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -23,7 +23,6 @@ import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
@@ -34,12 +33,10 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
 
-public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
+public class JobHistoryZKStateManager {
     public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
     private String zkRoot;
     private CuratorFramework curator;
-    public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions";
-    public static final String ZNODE_FORCE_START_FROM = "forceStartFrom";
     public static final String ZNODE_PARTITIONS = "partitions";
     public static final String ZNODE_JOBS = "jobs";
     public static final String ZNODE_JOB_IDS = "jobIds";
@@ -69,7 +66,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
             curator = newCurator(config);
             curator.start();
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            LOG.warn("curator already started, {}", e);
         }
     }
 
@@ -78,29 +75,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         curator = null;
     }
 
-    private String readForceStartFrom() {
-        String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
-        try {
-            if (curator.checkExists().forPath(path) != null) {
-                return new String(curator.getData().forPath(path), "UTF-8");
-            }
-        } catch (Exception ex) {
-            LOG.error("fail reading forceStartFrom znode", ex);
-        }
-        return null;
-    }
-
-    private void deleteForceStartFrom() {
-        String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
-        try {
-            if (curator.checkExists().forPath(path) != null) {
-                curator.delete().forPath(path);
-            }
-        } catch (Exception ex) {
-            LOG.error("fail reading forceStartFrom znode", ex);
-        }
-    }
-
     private String getProcessedDateAfterBackoff(int backOffDays) {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
         Calendar c = Calendar.getInstance();
@@ -135,86 +109,61 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
      * </p>
      * .
      */
-    @Override
-    public void ensureJobPartitions(int numTotalPartitions) {
+    public void ensureJobPartition(int partitionId, int numTotalPartitions) {
         // lock before rebuild job partitions
-        String lockForEnsureJobPartitions = zkRoot + "/" + ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS;
-        InterProcessMutex lock = new InterProcessMutex(curator, lockForEnsureJobPartitions);
         String path = zkRoot + "/" + ZNODE_PARTITIONS;
         try {
-            lock.acquire();
-            int minDate = 0;
-            String forceStartFrom = readForceStartFrom();
-            if (forceStartFrom != null) {
-                try {
-                    minDate = Integer.valueOf(forceStartFrom);
-                } catch (Exception ex) {
-                    LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom);
-                    throw new IllegalStateException();
-                }
-            } else {
-                boolean pathExists = curator.checkExists().forPath(path) == null ? false : true;
-                boolean structureChanged = true;
-                if (pathExists) {
-                    int currentCount = curator.getChildren().forPath(path).size();
-                    if (numTotalPartitions == currentCount) {
-                        structureChanged = false;
-                        LOG.info("znode partitions structure is unchanged");
-                    } else {
-                        LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions);
-                    }
-                }
-                if (!structureChanged) {
-                    return; // do nothing
-                }
-
-                if (pathExists) {
-                    List<String> partitions = curator.getChildren().forPath(path);
+            boolean partitionPathExists = curator.checkExists().forPath(path + "/" + partitionId) != null;
+            if (partitionPathExists) {
+                LOG.info("partition path {} exists", path + "/" + partitionId);
+                List<String> partitions = curator.getChildren().forPath(path);
+                if (partitions.size() > numTotalPartitions && numTotalPartitions == partitionId + 1) {
+                    //last partition delete needless partitions
                     for (String partition : partitions) {
-                        String date = new String(curator.getData().forPath(path + "/" + partition), "UTF-8");
-                        int tmp = Integer.valueOf(date);
-                        if (tmp < minDate) {
-                            minDate = tmp;
+                        if (Integer.parseInt(partition) > partitionId) {
+                            curator.delete().deletingChildrenIfNeeded().forPath(path + "/" + partition);
+                            LOG.info("delete partition {}", path + "/" + partition);
                         }
                     }
                 }
+                return;
+            }
 
-                if (minDate == 0) {
-                    minDate = Integer.valueOf(getProcessedDateAfterBackoff(BACKOFF_DAYS));
+            int minDate = 0;
+            boolean pathExists = curator.checkExists().forPath(path) != null;
+            if (pathExists) {
+                List<String> partitions = curator.getChildren().forPath(path);
+                for (String partition : partitions) {
+                    String date = new String(curator.getData().forPath(path + "/" + partition), "UTF-8");
+                    int tmp = Integer.valueOf(date);
+                    if (tmp < minDate) {
+                        minDate = tmp;
+                    }
                 }
             }
-            rebuildJobPartitions(numTotalPartitions, String.valueOf(minDate));
-            deleteForceStartFrom();
+
+            if (minDate == 0) {
+                minDate = Integer.valueOf(getProcessedDateAfterBackoff(BACKOFF_DAYS));
+            }
+
+            rebuildJobPartition(partitionId, String.valueOf(minDate));
         } catch (Exception e) {
             LOG.error("fail building job partitions", e);
             throw new RuntimeException(e);
-        } finally {
-            try {
-                lock.release();
-            } catch (Exception e) {
-                LOG.error("fail releasing lock", e);
-                throw new RuntimeException(e);
-            }
         }
     }
 
-    private void rebuildJobPartitions(int numTotalPartitions, String startingDate) throws Exception {
-        LOG.info("rebuild job partitions with numTotalPartitions " + numTotalPartitions + " with starting date " + startingDate);
-        String path = zkRoot + "/" + ZNODE_PARTITIONS;
-        // truncate all existing partitions
-        if (curator.checkExists().forPath(path) != null) {
-            curator.delete().deletingChildrenIfNeeded().forPath(path);
-        }
+    private void rebuildJobPartition(int partitionId, String startingDate) throws Exception {
+        LOG.info("create job partition " + partitionId + " with starting date " + startingDate);
+        String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
 
-        for (int i = 0; i < numTotalPartitions; i++) {
-            curator.create()
+        curator.create()
                 .creatingParentsIfNeeded()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
-        }
+                .withMode(CreateMode.PERSISTENT).forPath(path, startingDate.getBytes("UTF-8"));
+
+        updateProcessedTimeStamp(partitionId, 0L);
     }
 
-    @Override
     public String readProcessedDate(int partitionId) {
         String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
         try {
@@ -229,7 +178,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public void updateProcessedDate(int partitionId, String date) {
         String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
         try {
@@ -247,7 +195,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public void addProcessedJob(String date, String jobId) {
         String path = zkRoot + "/" + ZNODE_JOBS + "/" + date + "/" + jobId;
         try {
@@ -265,14 +212,11 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public void truncateProcessedJob(String date) {
         LOG.info("trying to truncate all data for day " + date);
         // we need lock before we do truncate
         String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
-        InterProcessMutex lock = new InterProcessMutex(curator, path);
         try {
-            lock.acquire();
             if (curator.checkExists().forPath(path) != null) {
                 curator.delete().deletingChildrenIfNeeded().forPath(path);
                 LOG.info("really truncated all data for day " + date);
@@ -286,17 +230,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         } catch (Exception e) {
             LOG.error("fail truncating processed jobs", e);
             throw new RuntimeException(e);
-        } finally {
-            try {
-                lock.release();
-            } catch (Exception e) {
-                LOG.error("fail releasing lock", e);
-                throw new RuntimeException(e);
-            }
         }
     }
 
-    @Override
     public List<String> readProcessedJobs(String date) {
         String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
         try {
@@ -311,7 +247,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public void truncateEverything() {
         String path = zkRoot;
         try {
@@ -324,7 +259,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public long readProcessedTimeStamp(int partitionId) {
         String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
         try {
@@ -343,7 +277,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public void updateProcessedTimeStamp(int partitionId, long timeStamp) {
         String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
         try {
@@ -361,7 +294,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         }
     }
 
-    @Override
     public List<Pair<String, String>> getProcessedJobs(String date) {
         List<Pair<String, String>> result = new ArrayList<>();
         String path = zkRoot + "/" + ZNODE_JOB_IDS + "/" + date;
@@ -381,7 +313,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         return result;
     }
 
-    @Override
     public void updateProcessedJob(String date, String jobId, String status) {
         String path = zkRoot + "/" + ZNODE_JOB_IDS + "/" + date + "/" + jobId;
         try {