You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/08 09:19:46 UTC

[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-498] add conf jetty for agent

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
     new d172f00  [TUBEMQ-498] add conf jetty for agent
d172f00 is described below

commit d172f00abd8bef9acf8fd98439f32ef13ba5648e
Author: yuanbo <yu...@apache.org>
AuthorDate: Thu Jan 7 21:19:40 2021 +0800

    [TUBEMQ-498] add conf jetty for agent
---
 .../org/apache/tubemq/agent/core/AgentManager.java | 11 +++
 .../apache/tubemq/agent/core/conf/ConfigJetty.java | 86 ++++++++++++++++++++++
 .../tubemq/agent/core/conf/ConfigServlet.java      | 79 ++++++++++++++++++++
 .../tubemq/agent/core/conf/ResponseResult.java     | 45 +++++++++++
 4 files changed, 221 insertions(+)

diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
index d5513ec..879010d 100644
--- a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
@@ -18,6 +18,7 @@ import org.apache.tubemq.agent.common.AbstractDaemon;
 import org.apache.tubemq.agent.conf.AgentConfiguration;
 import org.apache.tubemq.agent.conf.ProfileFetcher;
 import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.conf.ConfigJetty;
 import org.apache.tubemq.agent.core.job.JobManager;
 import org.apache.tubemq.agent.core.task.TaskManager;
 import org.apache.tubemq.agent.core.trigger.TriggerManager;
@@ -39,6 +40,8 @@ public class AgentManager extends AbstractDaemon {
     private static TriggerManager triggerManager;
 
 
+    // jetty for config operations via http.
+    private ConfigJetty configJetty;
 
     private final long waitTime;
     private final ProfileFetcher fetcher;
@@ -53,6 +56,11 @@ public class AgentManager extends AbstractDaemon {
         jobManager = new JobManager(this, new JobProfileDB(db));
         taskManager = new TaskManager(this);
 
+        // need to be an option.
+        if (conf.getBoolean(
+            AgentConstants.AGENT_ENABLE_HTTP, AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) {
+            this.configJetty = new ConfigJetty(jobManager);
+        }
         this.waitTime = conf.getLong(
             AgentConstants.THREAD_POOL_AWAIT_TIME, AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
     }
@@ -123,6 +131,9 @@ public class AgentManager extends AbstractDaemon {
      */
     @Override
     public void stop() throws Exception {
+        if (configJetty != null) {
+            configJetty.close();
+        }
         if (fetcher != null) {
             fetcher.stop();
         }
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigJetty.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigJetty.java
new file mode 100644
index 0000000..9161ce9
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigJetty.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.conf;
+
+import java.io.Closeable;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.job.JobManager;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * start http server and get job/agent config via http
+ */
+public class ConfigJetty implements Closeable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigJetty.class);
+
+    private final AgentConfiguration conf;
+    private final Server server;
+    private final JobManager jobManager;
+
+    public ConfigJetty(JobManager jobManager) {
+        this.conf = AgentConfiguration.getAgentConf();
+        this.jobManager = jobManager;
+        server = new Server();
+        try {
+            initJetty();
+        } catch (Exception ex) {
+            LOGGER.error("exception caught", ex);
+        }
+    }
+
+    private void initJetty() throws Exception {
+        ServerConnector connector = new ServerConnector(this.server);
+        connector.setPort(conf.getInt(
+            AgentConstants.AGENT_HTTP_PORT, AgentConstants.DEFAULT_AGENT_HTTP_PORT));
+        server.setConnectors(new Connector[] { connector });
+
+        ServletHandler servletHandler = new ServletHandler();
+        ServletHolder holder = new ServletHolder(new ConfigServlet(this));
+        servletHandler.addServletWithMapping(holder, "/config/*");
+        server.setHandler(servletHandler);
+        server.start();
+    }
+
+    public void storeJobConf(JobProfile jobProfile) {
+        // store job conf to bdb
+        jobManager.submitJobProfile(jobProfile);
+    }
+
+    public void storeAgentConf(String confJsonStr) {
+        // store agent conf to local file
+        AgentConfiguration conf = AgentConfiguration.getAgentConf();
+        conf.loadJsonStrResource(confJsonStr);
+        conf.flushToLocalPropertiesFile();
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (this.server != null) {
+                this.server.stop();
+            }
+        } catch (Exception ex) {
+            LOGGER.error("exception caught", ex);
+        }
+    }
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigServlet.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigServlet.java
new file mode 100644
index 0000000..94fd593
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigServlet.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.conf;
+
+import com.google.gson.Gson;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.IOUtils;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigServlet extends HttpServlet {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigServlet.class);
+
+    private static final String CONTENT_TYPE = "application/json";
+    private static final String CHARSET_TYPE = "UTF-8";
+    private final Gson gson = new Gson();
+    private final ConfigJetty configCenter;
+
+    public ConfigServlet(ConfigJetty configCenter) {
+        this.configCenter = configCenter;
+    }
+
+    public void responseToJson(HttpServletResponse response,
+        ResponseResult result) throws IOException {
+        response.setContentType(CONTENT_TYPE);
+        response.setCharacterEncoding(CHARSET_TYPE);
+        String jsonStr = gson.toJson(result);
+        PrintWriter out = response.getWriter();
+        out.print(jsonStr);
+        out.flush();
+    }
+
+    /**
+     * handle post requests.
+     *
+     * @param req  - request
+     * @param resp - response
+     */
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+        String pathInfo = req.getPathInfo();
+        ResponseResult responseResult = new ResponseResult(0, "");
+
+        try (BufferedReader reader = req.getReader()) {
+            String configJsonStr = IOUtils.toString(reader);
+            if (pathInfo.endsWith("job")) {
+                JobProfile jobConfiguration = JobProfile.parseJsonStr(configJsonStr);
+                configCenter.storeJobConf(jobConfiguration);
+            } else if (pathInfo.endsWith("agent")) {
+                // TODO add new agent configuration
+                configCenter.storeAgentConf(configJsonStr);
+            } else {
+                responseResult.setCode(-1).setMessage("child path is not correct");
+            }
+        } catch (Exception ex) {
+            LOGGER.error("error while handle post", ex);
+            responseResult.setCode(-1).setMessage(ex.getMessage());
+        }
+        responseToJson(resp, responseResult);
+    }
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ResponseResult.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ResponseResult.java
new file mode 100644
index 0000000..7ca9e20
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ResponseResult.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.conf;
+
+/**
+ * response json for http requests.
+ */
+public class ResponseResult {
+    private int code;
+    private String message;
+
+    public int getCode() {
+        return code;
+    }
+
+    public ResponseResult(int code, String message) {
+        this.code = code;
+        this.message = message;
+    }
+
+    public ResponseResult setCode(int code) {
+        this.code = code;
+        return this;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public ResponseResult setMessage(String message) {
+        this.message = message;
+        return this;
+    }
+}