You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/11 03:07:31 UTC

incubator-eagle git commit: [EAGLE-443] refactor ProcessedTimeStamp [Forced Update!]

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 2a99ace38 -> c65138b8b (forced update)


[EAGLE-443] refactor ProcessedTimeStamp

Author: jinhuwu <wu...@126.com>

Closes #326 from wujinhu/EAGLE-443.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c65138b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c65138b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c65138b8

Branch: refs/heads/develop
Commit: c65138b8b4a4ff1695c2edd93fdfad2d0f563ece
Parents: e2532a1
Author: jinhuwu <wu...@126.com>
Authored: Thu Aug 11 11:01:02 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Thu Aug 11 11:06:05 2016 +0800

----------------------------------------------------------------------
 .../eagle/jpm/mr/history/MRHistoryJobMain.java  |   3 -
 .../history/storm/HistoryJobProgressBolt.java   | 132 -------------------
 .../jpm/mr/history/storm/JobHistorySpout.java   |  75 ++++++++++-
 .../mr/history/zkres/JobHistoryZKStateLCM.java  |   2 +
 .../history/zkres/JobHistoryZKStateManager.java |  49 ++++++-
 5 files changed, 119 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
index ffa2f22..c6f1b98 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -25,7 +25,6 @@ import backtype.storm.topology.TopologyBuilder;
 import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
-import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
 import org.apache.eagle.jpm.util.Constants;
 
@@ -59,7 +58,6 @@ public class MRHistoryJobMain {
                 topologyName = jhfAppConf.getString("envContextConfig.topologyName");
             }
             String spoutName = "mrHistoryJobExecutor";
-            String boltName = "updateProcessTime";
             int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
             int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
             if (parallelism > tasks) {
@@ -70,7 +68,6 @@ public class MRHistoryJobMain {
                     new JobHistorySpout(filter, jhfConfigManager),
                     parallelism
             ).setNumTasks(tasks);
-            topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName);
 
             Config config = new backtype.storm.Config();
             config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers"));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
deleted file mode 100644
index 30374c4..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.storm;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-
-import java.util.*;
-
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
-import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HistoryJobProgressBolt extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(HistoryJobProgressBolt.class);
-
-    private final static int MAX_RETRY_TIMES = 3;
-    private Long m_minTimeStamp;
-    private int m_numTotalPartitions;
-    private JHFConfigManager configManager;
-    private Map<Integer, Long> m_partitionTimeStamp = new TreeMap<>();
-    public HistoryJobProgressBolt(String parentName, JHFConfigManager configManager) {
-        this.configManager = configManager;
-        m_numTotalPartitions = this.configManager.getConfig().getInt("envContextConfig.parallelismConfig." + parentName);
-        m_minTimeStamp = 0L;
-    }
-    @Override
-    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        Integer partitionId = tuple.getIntegerByField("partitionId");
-        Long timeStamp = tuple.getLongByField("timeStamp");
-        LOG.info("partition " + partitionId + ", timeStamp " + timeStamp);
-        if (!m_partitionTimeStamp.containsKey(partitionId) || (m_partitionTimeStamp.containsKey(partitionId) && m_partitionTimeStamp.get(partitionId) < timeStamp)) {
-            m_partitionTimeStamp.put(partitionId, timeStamp);
-        }
-
-        if (m_partitionTimeStamp.size() >= m_numTotalPartitions) {
-            //get min timestamp
-            Long minTimeStamp = Collections.min(m_partitionTimeStamp.values());
-
-            if (m_minTimeStamp == 0L) {
-                m_minTimeStamp = minTimeStamp;
-            }
-
-            if (m_minTimeStamp > minTimeStamp) {
-                //no need to update
-                return;
-            }
-
-            m_minTimeStamp = minTimeStamp;
-            final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-            final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
-            Map<String, String> baseTags = new HashMap<String, String>() { {
-                put("site", jobExtractorConfig.site);
-            } };
-            JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
-            entity.setCurrentTimeStamp(m_minTimeStamp);
-            entity.setTimestamp(m_minTimeStamp);
-            entity.setTags(baseTags);
-
-            IEagleServiceClient client = new EagleServiceClientImpl(
-                    eagleServiceConfig.eagleServiceHost,
-                    eagleServiceConfig.eagleServicePort,
-                    eagleServiceConfig.username,
-                    eagleServiceConfig.password);
-
-            client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
-
-            List<JobProcessTimeStampEntity> entities = new ArrayList<>();
-            entities.add(entity);
-
-            int tried = 0;
-            while (tried <= MAX_RETRY_TIMES) {
-                try {
-                    LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size());
-                    client.create(entities);
-                    LOG.info("finish flushing entities of total number " + entities.size());
-                    break;
-                } catch (Exception ex) {
-                    if (tried < MAX_RETRY_TIMES) {
-                        LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
-                    } else {
-                        LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
-                    }
-                }
-                tried ++;
-            }
-
-            client.getJerseyClient().destroy();
-            try {
-                client.close();
-            } catch (Exception e) {
-                LOG.error("failed to close eagle service client ", e);
-            }
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
-    }
-    @Override
-    public void cleanup() {
-        super.cleanup();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index a10599b..a0cdba7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -26,10 +26,15 @@ import backtype.storm.tuple.Fields;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
 import org.apache.eagle.jpm.mr.history.crawler.*;
+import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -86,6 +91,7 @@ public class JobHistorySpout extends BaseRichSpout {
     private JHFInputStreamCallback callback;
     private JHFConfigManager configManager;
     private JobHistoryLCM m_jhfLCM;
+    private final static int MAX_RETRY_TIMES = 3;
 
     public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) {
         this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
@@ -159,7 +165,8 @@ public class JobHistorySpout extends BaseRichSpout {
     public void nextTuple() {
         try {
             Long modifiedTime = driver.crawl();
-            interceptor.collect(new ValuesArray(partitionId, modifiedTime));
+            zkState.updateProcessedTimeStamp(partitionId, modifiedTime);
+            updateProcessedTimeStamp(modifiedTime);
         } catch (Exception ex) {
             LOG.error("fail crawling job history file and continue ...", ex);
             try {
@@ -181,7 +188,6 @@ public class JobHistorySpout extends BaseRichSpout {
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("partitionId", "timeStamp"));
     }
 
     /**
@@ -205,4 +211,69 @@ public class JobHistorySpout extends BaseRichSpout {
     @Override
     public void close() {
     }
+
+    private void updateProcessedTimeStamp(long modifiedTime) {
+        if (partitionId != 0) {
+            return;
+        }
+
+        //update latest process time
+        long minTimeStamp = modifiedTime;
+        for (int i = 1; i < numTotalPartitions; i++) {
+            long time = zkState.readProcessedTimeStamp(i);
+            if (time <= minTimeStamp) {
+                minTimeStamp = time;
+            }
+        }
+
+        if (minTimeStamp == 0l) {
+            return;
+        }
+
+        LOG.info("update process time stamp {}", minTimeStamp);
+        final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        Map<String, String> baseTags = new HashMap<String, String>() { {
+            put("site", jobExtractorConfig.site);
+        } };
+        JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
+        entity.setCurrentTimeStamp(minTimeStamp);
+        entity.setTimestamp(minTimeStamp);
+        entity.setTags(baseTags);
+
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+
+        List<JobProcessTimeStampEntity> entities = new ArrayList<>();
+        entities.add(entity);
+
+        int tried = 0;
+        while (tried <= MAX_RETRY_TIMES) {
+            try {
+                LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size());
+                client.create(entities);
+                LOG.info("finish flushing entities of total number " + entities.size());
+                break;
+            } catch (Exception ex) {
+                if (tried < MAX_RETRY_TIMES) {
+                    LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+                } else {
+                    LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+                }
+            }
+            tried ++;
+        }
+
+        client.getJerseyClient().destroy();
+        try {
+            client.close();
+        } catch (Exception e) {
+            LOG.error("failed to close eagle service client ", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
index 308057b..933b347 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -28,4 +28,6 @@ public interface JobHistoryZKStateLCM {
     void addProcessedJob(String date, String jobId);
     void truncateProcessedJob(String date);
     void truncateEverything();
+    long readProcessedTimeStamp(int partitionId);
+    void updateProcessedTimeStamp(int partitionId, long timeStamp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c65138b8/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 24dd7be..33d3cb2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -38,6 +38,8 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
     public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions";
     public static final String ZNODE_FORCE_START_FROM = "forceStartFrom";
     public static final String ZNODE_PARTITIONS = "partitions";
+    public static final String ZNODE_JOBS = "jobs";
+    public static final String ZNODE_TIMESTAMPS = "timeStamps";
 
     public static final int BACKOFF_DAYS = 0;
 
@@ -201,7 +203,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
 
     @Override
     public String readProcessedDate(int partitionId) {
-        String path = zkRoot + "/partitions/" + partitionId;
+        String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
         try {
             if (_curator.checkExists().forPath(path) != null) {
                 return new String(_curator.getData().forPath(path), "UTF-8");
@@ -216,7 +218,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
 
     @Override
     public void updateProcessedDate(int partitionId, String date) {
-        String path = zkRoot + "/partitions/" + partitionId;
+        String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId;
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
@@ -234,7 +236,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
 
     @Override
     public void addProcessedJob(String date, String jobId) {
-        String path = zkRoot + "/jobs/" + date + "/" + jobId;
+        String path = zkRoot + "/" + ZNODE_JOBS + "/" + date + "/" + jobId;
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
@@ -254,7 +256,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
     public void truncateProcessedJob(String date) {
         LOG.info("trying to truncate all data for day " + date);
         // we need lock before we do truncate
-        String path = zkRoot + "/jobs/" + date;
+        String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
         InterProcessMutex lock = new InterProcessMutex(_curator, path);
         try {
             lock.acquire();
@@ -277,7 +279,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
 
     @Override
     public List<String> readProcessedJobs(String date) {
-        String path = zkRoot + "/jobs/" + date;
+        String path = zkRoot + "/" + ZNODE_JOBS + "/" + date;
         try {
             if (_curator.checkExists().forPath(path) != null) {
                 return _curator.getChildren().forPath(path);
@@ -302,4 +304,41 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
             throw new RuntimeException(ex);
         }
     }
+
+    @Override
+    public long readProcessedTimeStamp(int partitionId) {
+        String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
+        try {
+            if (_curator.checkExists().forPath(path) == null) {
+                _curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path);
+                return 0l;
+            } else {
+                return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8"));
+            }
+        } catch (Exception e) {
+            LOG.error("fail to read timeStamp for partition " + partitionId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void updateProcessedTimeStamp(int partitionId, long timeStamp) {
+        String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS;
+        try {
+            if (_curator.checkExists().forPath(path) == null) {
+                _curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path);
+            }
+
+            _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8"));
+        } catch (Exception e) {
+            LOG.error("fail to update timeStamp for partition " + partitionId, e);
+            throw new RuntimeException(e);
+        }
+    }
 }