You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/07 13:12:35 UTC

[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-497] add trigger manager for agent (#384)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
     new 7fdced8  [TUBEMQ-497] add trigger manager for agent (#384)
7fdced8 is described below

commit 7fdced8f54045a28c022bea9122f8b9a1f758ae5
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Thu Jan 7 21:12:27 2021 +0800

    [TUBEMQ-497] add trigger manager for agent (#384)
---
 .../apache/tubemq/agent/conf/ProfileFetcher.java   |  36 ++++++
 .../org/apache/tubemq/agent/core/AgentManager.java |  31 +++++
 .../tubemq/agent/core/trigger/TriggerManager.java  | 126 +++++++++++++++++++++
 3 files changed, 193 insertions(+)

diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/ProfileFetcher.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/ProfileFetcher.java
new file mode 100644
index 0000000..ec47e63
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/ProfileFetcher.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.agent.conf;
+
+import java.util.List;
+import org.apache.tubemq.agent.common.Service;
+
+/**
+ * fetch profile from other system, communicate with json format string
+ */
+public interface ProfileFetcher extends Service {
+
+    /**
+     * get job profiles
+     * @return - job profile list
+     */
+    List<JobProfile> getJobProfiles();
+
+    /**
+     * get trigger profiles
+     * @return - trigger profile lisy
+     */
+    List<TriggerProfile>  getTriggerProfiles();
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
index 1c8bdc6..d5513ec 100644
--- a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
@@ -16,11 +16,14 @@ package org.apache.tubemq.agent.core;
 
 import org.apache.tubemq.agent.common.AbstractDaemon;
 import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.ProfileFetcher;
 import org.apache.tubemq.agent.constants.AgentConstants;
 import org.apache.tubemq.agent.core.job.JobManager;
 import org.apache.tubemq.agent.core.task.TaskManager;
+import org.apache.tubemq.agent.core.trigger.TriggerManager;
 import org.apache.tubemq.agent.db.DB;
 import org.apache.tubemq.agent.db.JobProfileDB;
+import org.apache.tubemq.agent.db.TriggerProfileDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,15 +36,20 @@ public class AgentManager extends AbstractDaemon {
     private static final Logger LOGGER = LoggerFactory.getLogger(AgentManager.class);
     private static JobManager jobManager;
     private static TaskManager taskManager;
+    private static TriggerManager triggerManager;
+
 
 
     private final long waitTime;
+    private final ProfileFetcher fetcher;
     private final AgentConfiguration conf;
     private final DB db;
 
     public AgentManager() {
         conf = AgentConfiguration.getAgentConf();
         this.db = initDB();
+        fetcher = initFetcher();
+        triggerManager = new TriggerManager(this, new TriggerProfileDB(db));
         jobManager = new JobManager(this, new JobProfileDB(db));
         taskManager = new TaskManager(this);
 
@@ -49,6 +57,21 @@ public class AgentManager extends AbstractDaemon {
             AgentConstants.THREAD_POOL_AWAIT_TIME, AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
     }
 
+    /**
+     * init fetch by class name
+     *
+     * @return
+     */
+    private ProfileFetcher initFetcher() {
+        try {
+            return (ProfileFetcher)
+                Class.forName(conf.get(AgentConstants.AGENT_FETCHER_CLASSNAME))
+                    .newInstance();
+        } catch (Exception ex) {
+            LOGGER.warn("cannot find fetcher, ignore it {}", ex.getMessage());
+        }
+        return null;
+    }
 
     /**
      * init db by class name
@@ -85,8 +108,12 @@ public class AgentManager extends AbstractDaemon {
     @Override
     public void start() throws Exception {
         LOGGER.info("starting agent manager");
+        triggerManager.start();
         jobManager.start();
         taskManager.start();
+        if (fetcher != null) {
+            fetcher.start();
+        }
     }
 
     /**
@@ -96,9 +123,13 @@ public class AgentManager extends AbstractDaemon {
      */
     @Override
     public void stop() throws Exception {
+        if (fetcher != null) {
+            fetcher.stop();
+        }
         // TODO: change job state which is in running state.
         LOGGER.info("stopping agent manager");
         // close in order: trigger -> job -> task
+        triggerManager.stop();
         jobManager.stop();
         taskManager.stop();
         this.db.close();
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/trigger/TriggerManager.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/trigger/TriggerManager.java
new file mode 100644
index 0000000..df0701f
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/trigger/TriggerManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.trigger;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.conf.TriggerProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.constants.JobConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.db.TriggerProfileDB;
+import org.apache.tubemq.agent.plugin.Trigger;
+
+/**
+ * manager for triggers.
+ */
+public class TriggerManager extends AbstractDaemon {
+
+    private final AgentManager manager;
+    private final TriggerProfileDB triggerProfileDB;
+    private final ConcurrentHashMap<String, Trigger> triggerMap;
+    private final AgentConfiguration conf;
+    private final int triggerFetchInterval;
+
+    public TriggerManager(AgentManager manager, TriggerProfileDB triggerProfileDB) {
+        this.conf = AgentConfiguration.getAgentConf();
+        this.manager = manager;
+        this.triggerProfileDB = triggerProfileDB;
+        this.triggerMap = new ConcurrentHashMap<>();
+        this.triggerFetchInterval = conf.getInt(AgentConstants.TRIGGER_FETCH_INTERVAL,
+                AgentConstants.DEFAULT_TRIGGER_FETCH_INTERVAL);
+    }
+
+    /**
+     * submit trigger profile.
+     * @param triggerProfile - trigger profile
+     */
+    public void submitTrigger(TriggerProfile triggerProfile) throws ClassNotFoundException,
+            InstantiationException, IllegalAccessException, IOException {
+        // make sure all required key exists.
+        if (triggerProfile.allRequiredKeyExist()) {
+            triggerProfileDB.storeTrigger(triggerProfile);
+            Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER));
+            Trigger trigger = (Trigger) triggerClass.newInstance();
+            triggerMap.putIfAbsent(triggerProfile.get(JobConstants.JOB_ID), trigger);
+            trigger.init(triggerProfile);
+            trigger.run();
+        }
+    }
+
+    private Runnable jobFetchThread() {
+        return () -> {
+            while (isRunnable()) {
+                try {
+                    triggerMap.forEach((s, trigger) -> {
+                        JobProfile profile = trigger.fetchJobProfile();
+                        if (profile != null) {
+                            manager.getJobManager().submitJobProfile(profile);
+                        }
+                    });
+                    TimeUnit.SECONDS.sleep(triggerFetchInterval);
+                } catch (Exception ignored) {}
+            }
+
+        };
+    }
+
+    /**
+     * delete trigger by trigger id.
+     * @param triggerId - trigger id.
+     */
+    public void deleteTrigger(String triggerId) {
+        Trigger trigger = triggerMap.remove(triggerId);
+        if (trigger != null) {
+            trigger.destroy();
+            // delete trigger from db
+            triggerProfileDB.deleteTrigger(triggerId);
+        }
+    }
+
+    /**
+     * init all triggers when daemon started.
+     */
+    private void initTriggers() throws Exception {
+        // fetch all triggers from db
+        List<TriggerProfile> profileList = triggerProfileDB.getTriggers();
+        for (TriggerProfile profile : profileList) {
+            submitTrigger(profile);
+        }
+    }
+
+    private void stopTriggers() {
+        triggerMap.forEach((s, trigger) -> {
+            trigger.destroy();
+        });
+    }
+
+    @Override
+    public void start() throws Exception {
+        initTriggers();
+        submitWorker(jobFetchThread());
+    }
+
+    @Override
+    public void stop() {
+        // stop all triggers
+        stopTriggers();
+    }
+}