You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/15 09:44:55 UTC
incubator-eagle git commit: [EAGLE-770] remove InterProcessMutex from
JobHistoryZKStateManager
Repository: incubator-eagle
Updated Branches:
refs/heads/master 8bb32c012 -> 446c22720
[EAGLE-770] remove InterProcessMutex from JobHistoryZKStateManager
Author: wujinhu <wu...@126.com>
Closes #655 from wujinhu/EAGLE-770.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/446c2272
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/446c2272
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/446c2272
Branch: refs/heads/master
Commit: 446c22720d930bc832fc5f3e32addf72115429a4
Parents: 8bb32c0
Author: wujinhu <wu...@126.com>
Authored: Tue Nov 15 17:44:47 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 15 17:44:47 2016 +0800
----------------------------------------------------------------------
.../history/crawler/JHFCrawlerDriverImpl.java | 6 +-
.../jpm/mr/history/storm/JobHistorySpout.java | 2 +-
.../mr/history/zkres/JobHistoryZKStateLCM.java | 47 ------
.../history/zkres/JobHistoryZKStateManager.java | 143 +++++--------------
4 files changed, 42 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 9c9374d..2c9dc8f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -250,9 +250,11 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
private void clearProcessedJob(Calendar cal) {
// clear all already processed jobs some days before current processing date (PROCESSED_JOB_KEEP_DAYS)
cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS);
- String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
+ String date = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH));
- JobHistoryZKStateManager.instance().truncateProcessedJob(line);
+ if (partitionId == 0) {
+ JobHistoryZKStateManager.instance().truncateProcessedJob(date);
+ }
}
private boolean isToday() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 436dbeb..9743ea7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -139,7 +139,7 @@ public class JobHistorySpout extends BaseRichSpout {
}
JobIdFilter jobIdFilter = new JobIdFilterByPartition(new DefaultJobIdPartitioner(), numTotalPartitions, partitionId);
JobHistoryZKStateManager.instance().init(appConfig.getZkStateConfig());
- JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
+ JobHistoryZKStateManager.instance().ensureJobPartition(partitionId, numTotalPartitions);
interceptor.setSpoutOutputCollector(collector);
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
deleted file mode 100644
index 0adcffe..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.zkres;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.List;
-
-public interface JobHistoryZKStateLCM {
- void ensureJobPartitions(int numTotalPartitions);
-
- String readProcessedDate(int partitionId);
-
- List<String> readProcessedJobs(String date);
-
- void updateProcessedDate(int partitionId, String date);
-
- void addProcessedJob(String date, String jobId);
-
- void updateProcessedJob(String date, String jobId, String status);
-
- void truncateProcessedJob(String date);
-
- void truncateEverything();
-
- long readProcessedTimeStamp(int partitionId);
-
- void updateProcessedTimeStamp(int partitionId, long timeStamp);
-
- List<Pair<String, String>> getProcessedJobs(String date);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/446c2272/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 78a906a..e063301 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -23,7 +23,6 @@ import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
@@ -34,12 +33,10 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
-public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
+public class JobHistoryZKStateManager {
public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
private String zkRoot;
private CuratorFramework curator;
- public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions";
- public static final String ZNODE_FORCE_START_FROM = "forceStartFrom";
public static final String ZNODE_PARTITIONS = "partitions";
public static final String ZNODE_JOBS = "jobs";
public static final String ZNODE_JOB_IDS = "jobIds";
@@ -69,7 +66,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
curator = newCurator(config);
curator.start();
} catch (Exception e) {
- throw new RuntimeException(e);
+ LOG.warn("curator already started, {}", e);
}
}
@@ -78,29 +75,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
curator = null;
}
- private String readForceStartFrom() {
- String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
- try {
- if (curator.checkExists().forPath(path) != null) {
- return new String(curator.getData().forPath(path), "UTF-8");
- }
- } catch (Exception ex) {
- LOG.error("fail reading forceStartFrom znode", ex);
- }
- return null;
- }
-
- private void deleteForceStartFrom() {
- String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
- try {
- if (curator.checkExists().forPath(path) != null) {
- curator.delete().forPath(path);
- }
- } catch (Exception ex) {
- LOG.error("fail reading forceStartFrom znode", ex);
- }
- }
-
private String getProcessedDateAfterBackoff(int backOffDays) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
Calendar c = Calendar.getInstance();
@@ -135,86 +109,61 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
* </p>
* .
*/
- @Override
- public void ensureJobPartitions(int numTotalPartitions) {
+ public void ensureJobPartition(int partitionId, int numTotalPartitions) {
// lock before rebuild job partitions
- String lockForEnsureJobPartitions = zkRoot + "/" + ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS;
- InterProcessMutex lock = new InterProcessMutex(curator, lockForEnsureJobPartitions);
String path = zkRoot + "/" + ZNODE_PARTITIONS;
try {
- lock.acquire();
- int minDate = 0;
- String forceStartFrom = readForceStartFrom();
- if (forceStartFrom != null) {
- try {
- minDate = Integer.valueOf(forceStartFrom);
- } catch (Exception ex) {
- LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom);
- throw new IllegalStateException();
- }
- } else {
- boolean pathExists = curator.checkExists().forPath(path) == null ? false : true;
- boolean structureChanged = true;
- if (pathExists) {
- int currentCount = curator.getChildren().forPath(path).size();
- if (numTotalPartitions == currentCount) {
- structureChanged = false;
- LOG.info("znode partitions structure is unchanged");
- } else {
- LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions);
- }
- }
- if (!structureChanged) {
- return; // do nothing
- }
-
- if (pathExists) {
- List<String> partitions = curator.getChildren().forPath(path);
+ boolean partitionPathExists = curator.checkExists().forPath(path + "/" + partitionId) != null;
+ if (partitionPathExists) {
+ LOG.info("partition path {} exists", path + "/" + partitionId);
+ List<String> partitions = curator.getChildren().forPath(path);
+ if (partitions.size() > numTotalPartitions && numTotalPartitions == partitionId + 1) {
+ //last partition delete needless partitions
for (String partition : partitions) {
- String date = new String(curator.getData().forPath(path + "/" + partition), "UTF-8");
- int tmp = Integer.valueOf(date);
- if (tmp < minDate) {
- minDate = tmp;
+ if (Integer.parseInt(partition) > partitionId) {
+ curator.delete().deletingChildrenIfNeeded().forPath(path + "/" + partition);
+ LOG.info("delete partition {}", path + "/" + partition);
}
}
}
+ return;
+ }
- if (minDate == 0) {
- minDate = Integer.valueOf(getProcessedDateAfterBackoff(BACKOFF_DAYS));
+ int minDate = 0;
+ boolean pathExists = curator.checkExists().forPath(path) != null;
+ if (pathExists) {
+ List<String> partitions = curator.getChildren().forPath(path);
+ for (String partition : partitions) {
+ String date = new String(curator.getData().forPath(path + "/" + partition), "UTF-8");
+ int tmp = Integer.valueOf(date);
+ if (tmp < minDate) {
+ minDate = tmp;
+ }
}
}
- rebuildJobPartitions(numTotalPartitions, String.valueOf(minDate));
- deleteForceStartFrom();
+
+ if (minDate == 0) {
+ minDate = Integer.valueOf(getProcessedDateAfterBackoff(BACKOFF_DAYS));
+ }
+
+ rebuildJobPartition(partitionId, String.valueOf(minDate));
} catch (Exception e) {
LOG.error("fail building job partitions", e);
throw new RuntimeException(e);
- } finally {
- try {
- lock.release();
- } catch (Exception e) {
- LOG.error("fail releasing lock", e);
- throw new RuntimeException(e);
- }
}
}
- private void rebuildJobPartitions(int numTotalPartitions, String startingDate) throws Exception {
- LOG.info("rebuild job partitions with numTotalPartitions " + numTotalPartitions + " with starting date " + startingDate);
- String path = zkRoot + "/" + ZNODE_PARTITIONS;
- // truncate all existing partitions
- if (curator.checkExists().forPath(path) != null) {
- curator.delete().deletingChildrenIfNeeded().forPath(path);
- }
+ private void rebuildJobPartition(int partitionId, String startingDate) throws Exception {
+ LOG.info("create job partition " + partitionId + " with starting date " + startingDate);
+ String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
- for (int i = 0; i < numTotalPartitions; i++) {
- curator.create()
+ curator.create()
.creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
- }
+ .withMode(CreateMode.PERSISTENT).forPath(path, startingDate.getBytes("UTF-8"));
+
+ updateProcessedTimeStamp(partitionId, 0L);
}
- @Override
public String readProcessedDate(int partitionId) {
String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
try {
@@ -229,7 +178,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public void updateProcessedDate(int partitionId, String date) {
String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
try {
@@ -247,7 +195,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public void addProcessedJob(String date, String jobId) {
String path = zkRoot + "/" + ZNODE_JOBS + "/" + date + "/" + jobId;
try {
@@ -265,14 +212,11 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public void truncateProcessedJob(String date) {
LOG.info("trying to truncate all data for day " + date);
// we need lock before we do truncate
String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
- InterProcessMutex lock = new InterProcessMutex(curator, path);
try {
- lock.acquire();
if (curator.checkExists().forPath(path) != null) {
curator.delete().deletingChildrenIfNeeded().forPath(path);
LOG.info("really truncated all data for day " + date);
@@ -286,17 +230,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
} catch (Exception e) {
LOG.error("fail truncating processed jobs", e);
throw new RuntimeException(e);
- } finally {
- try {
- lock.release();
- } catch (Exception e) {
- LOG.error("fail releasing lock", e);
- throw new RuntimeException(e);
- }
}
}
- @Override
public List<String> readProcessedJobs(String date) {
String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
try {
@@ -311,7 +247,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public void truncateEverything() {
String path = zkRoot;
try {
@@ -324,7 +259,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public long readProcessedTimeStamp(int partitionId) {
String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
try {
@@ -343,7 +277,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public void updateProcessedTimeStamp(int partitionId, long timeStamp) {
String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
try {
@@ -361,7 +294,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
}
}
- @Override
public List<Pair<String, String>> getProcessedJobs(String date) {
List<Pair<String, String>> result = new ArrayList<>();
String path = zkRoot + "/" + ZNODE_JOB_IDS + "/" + date;
@@ -381,7 +313,6 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
return result;
}
- @Override
public void updateProcessedJob(String date, String jobId, String status) {
String path = zkRoot + "/" + ZNODE_JOB_IDS + "/" + date + "/" + jobId;
try {