You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/01 03:26:29 UTC
incubator-eagle git commit: [EAGLE-515] refactor zk manager to
singleton
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 8de69a88d -> c940f56c2
[EAGLE-515] refactor zk manager to singleton
Author: wujinhu <wu...@126.com>
Closes #411 from wujinhu/EAGLE-515.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c940f56c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c940f56c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c940f56c
Branch: refs/heads/develop
Commit: c940f56c27296c5fec657bd240e96c5522199262
Parents: 8de69a8
Author: wujinhu <wu...@126.com>
Authored: Thu Sep 1 11:26:10 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Sep 1 11:26:10 2016 +0800
----------------------------------------------------------------------
.../mr/history/crawler/JHFCrawlerDriverImpl.java | 17 +++++++----------
.../JobEntityCreationEagleServiceListener.java | 4 +---
.../jpm/mr/history/storm/JobHistorySpout.java | 10 ++++------
.../mr/history/zkres/JobHistoryZKStateManager.java | 8 +++++++-
4 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 1a17751..077f4e1 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.crawler;
import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
-import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
import org.apache.eagle.jpm.mr.historyentity.JobCountEntity;
import org.apache.eagle.jpm.util.JobIdFilter;
import org.apache.commons.lang3.tuple.Pair;
@@ -58,7 +58,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
private JHFInputStreamCallback reader;
protected boolean zeroBasedMonth = true;
- private JobHistoryZKStateLCM zkStateLcm;
private JobHistoryLCM jhfLCM;
private JobIdFilter jobFilter;
private int partitionId;
@@ -69,7 +68,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader,
- JobHistoryZKStateLCM zkStateLCM,
JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
this.eagleServiceConfig = eagleServiceConfig;
this.jobExtractorConfig = jobExtractorConfig;
@@ -80,7 +78,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
}
this.reader = reader;
jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
- this.zkStateLcm = zkStateLCM;
this.partitionId = partitionId;
this.jobFilter = jobFilter;
timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
@@ -187,7 +184,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
jobHistoryFile,
reader);
}
- zkStateLcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
+ JobHistoryZKStateManager.instance().addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
this.processDate.year,
this.processDate.month + 1,
this.processDate.day),
@@ -202,7 +199,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
private void updateProcessDate() throws Exception {
String line = String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year,
this.processDate.month + 1, this.processDate.day);
- zkStateLcm.updateProcessedDate(partitionId, line);
+ JobHistoryZKStateManager.instance().updateProcessedDate(partitionId, line);
}
private int getActualMonth(int month) {
@@ -220,7 +217,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
}
private void readAndCacheLastProcessedDate() throws Exception {
- String lastProcessedDate = zkStateLcm.readProcessedDate(partitionId);
+ String lastProcessedDate = JobHistoryZKStateManager.instance().readProcessedDate(partitionId);
Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate);
if (m.find() && m.groupCount() == 3) {
this.processDate.year = Integer.parseInt(m.group(1));
@@ -233,7 +230,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
GregorianCalendar cal = new GregorianCalendar(timeZone);
cal.set(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0);
cal.add(Calendar.DATE, 1);
- List<String> list = zkStateLcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
+ List<String> list = JobHistoryZKStateManager.instance().readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)));
if (list != null) {
this.processedJobFileNames = new HashSet<>(list);
@@ -241,7 +238,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
}
private void flushJobCount() throws Exception {
- List<Pair<String, String>> jobs = zkStateLcm.getProcessedJobs(
+ List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(
String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day)
);
JobCountEntity entity = new JobCountEntity();
@@ -300,7 +297,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS);
String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH));
- zkStateLcm.truncateProcessedJob(line);
+ JobHistoryZKStateManager.instance().truncateProcessedJob(line);
}
private boolean isToday() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 520fbbc..30eeb54 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -90,13 +90,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
- JobHistoryZKStateManager zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
logger.info("start flushing entities of total number " + list.size());
for (int i = 0; i < list.size(); i++) {
JobBaseAPIEntity entity = list.get(i);
if (entity instanceof JobExecutionAPIEntity) {
jobs.add((JobExecutionAPIEntity) entity);
- zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
+ JobHistoryZKStateManager.instance().updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
entity.getTags().get(MRJobTagName.JOB_ID.toString()),
((JobExecutionAPIEntity) entity).getCurrentState());
} else if (entity instanceof JobEventAPIEntity) {
@@ -107,7 +106,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
}
}
- zkState.close();
GenericServiceAPIResponseEntity result;
if (jobs.size() > 0) {
logger.info("flush JobExecutionAPIEntity of number " + jobs.size());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 402f93e..04283d3 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -87,7 +87,6 @@ public class JobHistorySpout extends BaseRichSpout {
private int partitionId;
private int numTotalPartitions;
- private transient JobHistoryZKStateManager zkState;
private transient JHFCrawlerDriver driver;
private JobHistoryContentFilter contentFilter;
private JobHistorySpoutCollectorInterceptor interceptor;
@@ -143,8 +142,8 @@ public class JobHistorySpout extends BaseRichSpout {
throw new IllegalStateException(e);
}
JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
- zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
- zkState.ensureJobPartitions(numTotalPartitions);
+ JobHistoryZKStateManager.instance().init(configManager.getZkStateConfig());
+ JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
interceptor.setSpoutOutputCollector(collector);
try {
@@ -154,7 +153,6 @@ public class JobHistorySpout extends BaseRichSpout {
configManager.getJobExtractorConfig(),
configManager.getControlConfig(),
callback,
- zkState,
jhfLCM,
jobIdFilter,
partitionId);
@@ -168,7 +166,7 @@ public class JobHistorySpout extends BaseRichSpout {
public void nextTuple() {
try {
Long modifiedTime = driver.crawl();
- zkState.updateProcessedTimeStamp(partitionId, modifiedTime);
+ JobHistoryZKStateManager.instance().updateProcessedTimeStamp(partitionId, modifiedTime);
updateProcessedTimeStamp(modifiedTime);
} catch (Exception ex) {
LOG.error("fail crawling job history file and continue ...", ex);
@@ -223,7 +221,7 @@ public class JobHistorySpout extends BaseRichSpout {
//update latest process time
long minTimeStamp = modifiedTime;
for (int i = 1; i < numTotalPartitions; i++) {
- long time = zkState.readProcessedTimeStamp(i);
+ long time = JobHistoryZKStateManager.instance().readProcessedTimeStamp(i);
if (time <= minTimeStamp) {
minTimeStamp = time;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c940f56c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index c61d05a..2e64da3 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -47,6 +47,8 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
public static final int BACKOFF_DAYS = 0;
+ private static JobHistoryZKStateManager jobHistoryZKStateManager = new JobHistoryZKStateManager();
+
private CuratorFramework newCurator(ZKStateConfig config) throws Exception {
return CuratorFrameworkFactory.newClient(
config.zkQuorum,
@@ -56,7 +58,11 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
);
}
- public JobHistoryZKStateManager(ZKStateConfig config) {
+ public static JobHistoryZKStateManager instance() {
+ return jobHistoryZKStateManager;
+ }
+
+ public void init(ZKStateConfig config) {
this.zkRoot = config.zkRoot;
try {