You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/07 13:12:35 UTC
[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-497] add
trigger manager for agent (#384)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
new 7fdced8 [TUBEMQ-497] add trigger manager for agent (#384)
7fdced8 is described below
commit 7fdced8f54045a28c022bea9122f8b9a1f758ae5
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Thu Jan 7 21:12:27 2021 +0800
[TUBEMQ-497] add trigger manager for agent (#384)
---
.../apache/tubemq/agent/conf/ProfileFetcher.java | 36 ++++++
.../org/apache/tubemq/agent/core/AgentManager.java | 31 +++++
.../tubemq/agent/core/trigger/TriggerManager.java | 126 +++++++++++++++++++++
3 files changed, 193 insertions(+)
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/ProfileFetcher.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/ProfileFetcher.java
new file mode 100644
index 0000000..ec47e63
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/ProfileFetcher.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.agent.conf;
+
+import java.util.List;
+import org.apache.tubemq.agent.common.Service;
+
+/**
+ * fetch profile from other system, communicate with json format string
+ */
+public interface ProfileFetcher extends Service {
+
+ /**
+ * get job profiles
+ * @return - job profile list
+ */
+ List<JobProfile> getJobProfiles();
+
+ /**
+ * get trigger profiles
+ * @return - trigger profile lisy
+ */
+ List<TriggerProfile> getTriggerProfiles();
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
index 1c8bdc6..d5513ec 100644
--- a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
@@ -16,11 +16,14 @@ package org.apache.tubemq.agent.core;
import org.apache.tubemq.agent.common.AbstractDaemon;
import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.ProfileFetcher;
import org.apache.tubemq.agent.constants.AgentConstants;
import org.apache.tubemq.agent.core.job.JobManager;
import org.apache.tubemq.agent.core.task.TaskManager;
+import org.apache.tubemq.agent.core.trigger.TriggerManager;
import org.apache.tubemq.agent.db.DB;
import org.apache.tubemq.agent.db.JobProfileDB;
+import org.apache.tubemq.agent.db.TriggerProfileDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,15 +36,20 @@ public class AgentManager extends AbstractDaemon {
private static final Logger LOGGER = LoggerFactory.getLogger(AgentManager.class);
private static JobManager jobManager;
private static TaskManager taskManager;
+ private static TriggerManager triggerManager;
+
private final long waitTime;
+ private final ProfileFetcher fetcher;
private final AgentConfiguration conf;
private final DB db;
public AgentManager() {
conf = AgentConfiguration.getAgentConf();
this.db = initDB();
+ fetcher = initFetcher();
+ triggerManager = new TriggerManager(this, new TriggerProfileDB(db));
jobManager = new JobManager(this, new JobProfileDB(db));
taskManager = new TaskManager(this);
@@ -49,6 +57,21 @@ public class AgentManager extends AbstractDaemon {
AgentConstants.THREAD_POOL_AWAIT_TIME, AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
}
+ /**
+ * init fetch by class name
+ *
+ * @return
+ */
+ private ProfileFetcher initFetcher() {
+ try {
+ return (ProfileFetcher)
+ Class.forName(conf.get(AgentConstants.AGENT_FETCHER_CLASSNAME))
+ .newInstance();
+ } catch (Exception ex) {
+ LOGGER.warn("cannot find fetcher, ignore it {}", ex.getMessage());
+ }
+ return null;
+ }
/**
* init db by class name
@@ -85,8 +108,12 @@ public class AgentManager extends AbstractDaemon {
@Override
public void start() throws Exception {
LOGGER.info("starting agent manager");
+ triggerManager.start();
jobManager.start();
taskManager.start();
+ if (fetcher != null) {
+ fetcher.start();
+ }
}
/**
@@ -96,9 +123,13 @@ public class AgentManager extends AbstractDaemon {
*/
@Override
public void stop() throws Exception {
+ if (fetcher != null) {
+ fetcher.stop();
+ }
// TODO: change job state which is in running state.
LOGGER.info("stopping agent manager");
// close in order: trigger -> job -> task
+ triggerManager.stop();
jobManager.stop();
taskManager.stop();
this.db.close();
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/trigger/TriggerManager.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/trigger/TriggerManager.java
new file mode 100644
index 0000000..df0701f
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/trigger/TriggerManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.trigger;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.conf.TriggerProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.constants.JobConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.db.TriggerProfileDB;
+import org.apache.tubemq.agent.plugin.Trigger;
+
+/**
+ * manager for triggers.
+ */
+public class TriggerManager extends AbstractDaemon {
+
+ private final AgentManager manager;
+ private final TriggerProfileDB triggerProfileDB;
+ private final ConcurrentHashMap<String, Trigger> triggerMap;
+ private final AgentConfiguration conf;
+ private final int triggerFetchInterval;
+
+ public TriggerManager(AgentManager manager, TriggerProfileDB triggerProfileDB) {
+ this.conf = AgentConfiguration.getAgentConf();
+ this.manager = manager;
+ this.triggerProfileDB = triggerProfileDB;
+ this.triggerMap = new ConcurrentHashMap<>();
+ this.triggerFetchInterval = conf.getInt(AgentConstants.TRIGGER_FETCH_INTERVAL,
+ AgentConstants.DEFAULT_TRIGGER_FETCH_INTERVAL);
+ }
+
+ /**
+ * submit trigger profile.
+ * @param triggerProfile - trigger profile
+ */
+ public void submitTrigger(TriggerProfile triggerProfile) throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException, IOException {
+ // make sure all required key exists.
+ if (triggerProfile.allRequiredKeyExist()) {
+ triggerProfileDB.storeTrigger(triggerProfile);
+ Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER));
+ Trigger trigger = (Trigger) triggerClass.newInstance();
+ triggerMap.putIfAbsent(triggerProfile.get(JobConstants.JOB_ID), trigger);
+ trigger.init(triggerProfile);
+ trigger.run();
+ }
+ }
+
+ private Runnable jobFetchThread() {
+ return () -> {
+ while (isRunnable()) {
+ try {
+ triggerMap.forEach((s, trigger) -> {
+ JobProfile profile = trigger.fetchJobProfile();
+ if (profile != null) {
+ manager.getJobManager().submitJobProfile(profile);
+ }
+ });
+ TimeUnit.SECONDS.sleep(triggerFetchInterval);
+ } catch (Exception ignored) {}
+ }
+
+ };
+ }
+
+ /**
+ * delete trigger by trigger id.
+ * @param triggerId - trigger id.
+ */
+ public void deleteTrigger(String triggerId) {
+ Trigger trigger = triggerMap.remove(triggerId);
+ if (trigger != null) {
+ trigger.destroy();
+ // delete trigger from db
+ triggerProfileDB.deleteTrigger(triggerId);
+ }
+ }
+
+ /**
+ * init all triggers when daemon started.
+ */
+ private void initTriggers() throws Exception {
+ // fetch all triggers from db
+ List<TriggerProfile> profileList = triggerProfileDB.getTriggers();
+ for (TriggerProfile profile : profileList) {
+ submitTrigger(profile);
+ }
+ }
+
+ private void stopTriggers() {
+ triggerMap.forEach((s, trigger) -> {
+ trigger.destroy();
+ });
+ }
+
+ @Override
+ public void start() throws Exception {
+ initTriggers();
+ submitWorker(jobFetchThread());
+ }
+
+ @Override
+ public void stop() {
+ // stop all triggers
+ stopTriggers();
+ }
+}