You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/11 03:07:31 UTC
incubator-eagle git commit: [EAGLE-443] refactor ProcessedTimeStamp
[Forced Update!]
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 2a99ace38 -> c65138b8b (forced update)
[EAGLE-443] refactor ProcessedTimeStamp
Author: jinhuwu <wu...@126.com>
Closes #326 from wujinhu/EAGLE-443.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c65138b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c65138b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c65138b8
Branch: refs/heads/develop
Commit: c65138b8b4a4ff1695c2edd93fdfad2d0f563ece
Parents: e2532a1
Author: jinhuwu <wu...@126.com>
Authored: Thu Aug 11 11:01:02 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Thu Aug 11 11:06:05 2016 +0800
----------------------------------------------------------------------
.../eagle/jpm/mr/history/MRHistoryJobMain.java | 3 -
.../history/storm/HistoryJobProgressBolt.java | 132 -------------------
.../jpm/mr/history/storm/JobHistorySpout.java | 75 ++++++++++-
.../mr/history/zkres/JobHistoryZKStateLCM.java | 2 +
.../history/zkres/JobHistoryZKStateManager.java | 49 ++++++-
5 files changed, 119 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
index ffa2f22..c6f1b98 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -25,7 +25,6 @@ import backtype.storm.topology.TopologyBuilder;
import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
-import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt;
import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
import org.apache.eagle.jpm.util.Constants;
@@ -59,7 +58,6 @@ public class MRHistoryJobMain {
topologyName = jhfAppConf.getString("envContextConfig.topologyName");
}
String spoutName = "mrHistoryJobExecutor";
- String boltName = "updateProcessTime";
int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
if (parallelism > tasks) {
@@ -70,7 +68,6 @@ public class MRHistoryJobMain {
new JobHistorySpout(filter, jhfConfigManager),
parallelism
).setNumTasks(tasks);
- topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName);
Config config = new backtype.storm.Config();
config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers"));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
deleted file mode 100644
index 30374c4..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.storm;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-
-import java.util.*;
-
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
-import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HistoryJobProgressBolt extends BaseRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(HistoryJobProgressBolt.class);
-
- private final static int MAX_RETRY_TIMES = 3;
- private Long m_minTimeStamp;
- private int m_numTotalPartitions;
- private JHFConfigManager configManager;
- private Map<Integer, Long> m_partitionTimeStamp = new TreeMap<>();
- public HistoryJobProgressBolt(String parentName, JHFConfigManager configManager) {
- this.configManager = configManager;
- m_numTotalPartitions = this.configManager.getConfig().getInt("envContextConfig.parallelismConfig." + parentName);
- m_minTimeStamp = 0L;
- }
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-
- }
-
- @Override
- public void execute(Tuple tuple) {
- Integer partitionId = tuple.getIntegerByField("partitionId");
- Long timeStamp = tuple.getLongByField("timeStamp");
- LOG.info("partition " + partitionId + ", timeStamp " + timeStamp);
- if (!m_partitionTimeStamp.containsKey(partitionId) || (m_partitionTimeStamp.containsKey(partitionId) && m_partitionTimeStamp.get(partitionId) < timeStamp)) {
- m_partitionTimeStamp.put(partitionId, timeStamp);
- }
-
- if (m_partitionTimeStamp.size() >= m_numTotalPartitions) {
- //get min timestamp
- Long minTimeStamp = Collections.min(m_partitionTimeStamp.values());
-
- if (m_minTimeStamp == 0L) {
- m_minTimeStamp = minTimeStamp;
- }
-
- if (m_minTimeStamp > minTimeStamp) {
- //no need to update
- return;
- }
-
- m_minTimeStamp = minTimeStamp;
- final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
- final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
- Map<String, String> baseTags = new HashMap<String, String>() { {
- put("site", jobExtractorConfig.site);
- } };
- JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
- entity.setCurrentTimeStamp(m_minTimeStamp);
- entity.setTimestamp(m_minTimeStamp);
- entity.setTags(baseTags);
-
- IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
-
- client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
-
- List<JobProcessTimeStampEntity> entities = new ArrayList<>();
- entities.add(entity);
-
- int tried = 0;
- while (tried <= MAX_RETRY_TIMES) {
- try {
- LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size());
- client.create(entities);
- LOG.info("finish flushing entities of total number " + entities.size());
- break;
- } catch (Exception ex) {
- if (tried < MAX_RETRY_TIMES) {
- LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
- } else {
- LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
- }
- }
- tried ++;
- }
-
- client.getJerseyClient().destroy();
- try {
- client.close();
- } catch (Exception e) {
- LOG.error("failed to close eagle service client ", e);
- }
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
- }
- @Override
- public void cleanup() {
- super.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index a10599b..a0cdba7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -26,10 +26,15 @@ import backtype.storm.tuple.Fields;
import org.apache.eagle.dataproc.core.ValuesArray;
import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
import org.apache.eagle.jpm.mr.history.crawler.*;
+import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -86,6 +91,7 @@ public class JobHistorySpout extends BaseRichSpout {
private JHFInputStreamCallback callback;
private JHFConfigManager configManager;
private JobHistoryLCM m_jhfLCM;
+ private final static int MAX_RETRY_TIMES = 3;
public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) {
this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
@@ -159,7 +165,8 @@ public class JobHistorySpout extends BaseRichSpout {
public void nextTuple() {
try {
Long modifiedTime = driver.crawl();
- interceptor.collect(new ValuesArray(partitionId, modifiedTime));
+ zkState.updateProcessedTimeStamp(partitionId, modifiedTime);
+ updateProcessedTimeStamp(modifiedTime);
} catch (Exception ex) {
LOG.error("fail crawling job history file and continue ...", ex);
try {
@@ -181,7 +188,6 @@ public class JobHistorySpout extends BaseRichSpout {
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("partitionId", "timeStamp"));
}
/**
@@ -205,4 +211,69 @@ public class JobHistorySpout extends BaseRichSpout {
@Override
public void close() {
}
+
+ private void updateProcessedTimeStamp(long modifiedTime) {
+ if (partitionId != 0) {
+ return;
+ }
+
+ //update latest process time
+ long minTimeStamp = modifiedTime;
+ for (int i = 1; i < numTotalPartitions; i++) {
+ long time = zkState.readProcessedTimeStamp(i);
+ if (time <= minTimeStamp) {
+ minTimeStamp = time;
+ }
+ }
+
+ if (minTimeStamp == 0l) {
+ return;
+ }
+
+ LOG.info("update process time stamp {}", minTimeStamp);
+ final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ Map<String, String> baseTags = new HashMap<String, String>() { {
+ put("site", jobExtractorConfig.site);
+ } };
+ JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
+ entity.setCurrentTimeStamp(minTimeStamp);
+ entity.setTimestamp(minTimeStamp);
+ entity.setTags(baseTags);
+
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
+
+ client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+
+ List<JobProcessTimeStampEntity> entities = new ArrayList<>();
+ entities.add(entity);
+
+ int tried = 0;
+ while (tried <= MAX_RETRY_TIMES) {
+ try {
+ LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size());
+ client.create(entities);
+ LOG.info("finish flushing entities of total number " + entities.size());
+ break;
+ } catch (Exception ex) {
+ if (tried < MAX_RETRY_TIMES) {
+ LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+ } else {
+ LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+ }
+ }
+ tried ++;
+ }
+
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("failed to close eagle service client ", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
index 308057b..933b347 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -28,4 +28,6 @@ public interface JobHistoryZKStateLCM {
void addProcessedJob(String date, String jobId);
void truncateProcessedJob(String date);
void truncateEverything();
+ long readProcessedTimeStamp(int partitionId);
+ void updateProcessedTimeStamp(int partitionId, long timeStamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 24dd7be..33d3cb2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -38,6 +38,8 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions";
public static final String ZNODE_FORCE_START_FROM = "forceStartFrom";
public static final String ZNODE_PARTITIONS = "partitions";
+ public static final String ZNODE_JOBS = "jobs";
+ public static final String ZNODE_TIMESTAMPS = "timeStamps";
public static final int BACKOFF_DAYS = 0;
@@ -201,7 +203,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
@Override
public String readProcessedDate(int partitionId) {
- String path = zkRoot + "/partitions/" + partitionId;
+ String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
try {
if (_curator.checkExists().forPath(path) != null) {
return new String(_curator.getData().forPath(path), "UTF-8");
@@ -216,7 +218,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
@Override
public void updateProcessedDate(int partitionId, String date) {
- String path = zkRoot + "/partitions/" + partitionId;
+ String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
@@ -234,7 +236,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
@Override
public void addProcessedJob(String date, String jobId) {
- String path = zkRoot + "/jobs/" + date + "/" + jobId;
+ String path = zkRoot + "/" + ZNODE_JOBS + "/" + date + "/" + jobId;
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
@@ -254,7 +256,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
public void truncateProcessedJob(String date) {
LOG.info("trying to truncate all data for day " + date);
// we need lock before we do truncate
- String path = zkRoot + "/jobs/" + date;
+ String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
InterProcessMutex lock = new InterProcessMutex(_curator, path);
try {
lock.acquire();
@@ -277,7 +279,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
@Override
public List<String> readProcessedJobs(String date) {
- String path = zkRoot + "/jobs/" + date;
+ String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
try {
if (_curator.checkExists().forPath(path) != null) {
return _curator.getChildren().forPath(path);
@@ -302,4 +304,41 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
throw new RuntimeException(ex);
}
}
+
+ @Override
+ public long readProcessedTimeStamp(int partitionId) {
+ String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
+ try {
+ if (_curator.checkExists().forPath(path) == null) {
+ _curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
+ return 0l;
+ } else {
+ return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8"));
+ }
+ } catch (Exception e) {
+ LOG.error("fail to read timeStamp for partition " + partitionId, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void updateProcessedTimeStamp(int partitionId, long timeStamp) {
+ String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
+ try {
+ if (_curator.checkExists().forPath(path) == null) {
+ _curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
+ }
+
+ _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8"));
+ } catch (Exception e) {
+ LOG.error("fail to update timeStamp for partition " + partitionId, e);
+ throw new RuntimeException(e);
+ }
+ }
}