You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/08 09:19:46 UTC
[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-498] add conf
jetty for agent
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
new d172f00 [TUBEMQ-498] add conf jetty for agent
d172f00 is described below
commit d172f00abd8bef9acf8fd98439f32ef13ba5648e
Author: yuanbo <yu...@apache.org>
AuthorDate: Thu Jan 7 21:19:40 2021 +0800
[TUBEMQ-498] add conf jetty for agent
---
.../org/apache/tubemq/agent/core/AgentManager.java | 11 +++
.../apache/tubemq/agent/core/conf/ConfigJetty.java | 86 ++++++++++++++++++++++
.../tubemq/agent/core/conf/ConfigServlet.java | 79 ++++++++++++++++++++
.../tubemq/agent/core/conf/ResponseResult.java | 45 +++++++++++
4 files changed, 221 insertions(+)
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
index d5513ec..879010d 100644
--- a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
@@ -18,6 +18,7 @@ import org.apache.tubemq.agent.common.AbstractDaemon;
import org.apache.tubemq.agent.conf.AgentConfiguration;
import org.apache.tubemq.agent.conf.ProfileFetcher;
import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.conf.ConfigJetty;
import org.apache.tubemq.agent.core.job.JobManager;
import org.apache.tubemq.agent.core.task.TaskManager;
import org.apache.tubemq.agent.core.trigger.TriggerManager;
@@ -39,6 +40,8 @@ public class AgentManager extends AbstractDaemon {
private static TriggerManager triggerManager;
+ // jetty for config operations via http.
+ private ConfigJetty configJetty;
private final long waitTime;
private final ProfileFetcher fetcher;
@@ -53,6 +56,11 @@ public class AgentManager extends AbstractDaemon {
jobManager = new JobManager(this, new JobProfileDB(db));
taskManager = new TaskManager(this);
+ // need to be an option.
+ if (conf.getBoolean(
+ AgentConstants.AGENT_ENABLE_HTTP, AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) {
+ this.configJetty = new ConfigJetty(jobManager);
+ }
this.waitTime = conf.getLong(
AgentConstants.THREAD_POOL_AWAIT_TIME, AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
}
@@ -123,6 +131,9 @@ public class AgentManager extends AbstractDaemon {
*/
@Override
public void stop() throws Exception {
+ if (configJetty != null) {
+ configJetty.close();
+ }
if (fetcher != null) {
fetcher.stop();
}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigJetty.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigJetty.java
new file mode 100644
index 0000000..9161ce9
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigJetty.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.conf;
+
+import java.io.Closeable;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.job.JobManager;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * start http server and get job/agent config via http
+ */
+public class ConfigJetty implements Closeable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigJetty.class);
+
+ private final AgentConfiguration conf;
+ private final Server server;
+ private final JobManager jobManager;
+
+ public ConfigJetty(JobManager jobManager) {
+ this.conf = AgentConfiguration.getAgentConf();
+ this.jobManager = jobManager;
+ server = new Server();
+ try {
+ initJetty();
+ } catch (Exception ex) {
+ LOGGER.error("exception caught", ex);
+ }
+ }
+
+ private void initJetty() throws Exception {
+ ServerConnector connector = new ServerConnector(this.server);
+ connector.setPort(conf.getInt(
+ AgentConstants.AGENT_HTTP_PORT, AgentConstants.DEFAULT_AGENT_HTTP_PORT));
+ server.setConnectors(new Connector[] { connector });
+
+ ServletHandler servletHandler = new ServletHandler();
+ ServletHolder holder = new ServletHolder(new ConfigServlet(this));
+ servletHandler.addServletWithMapping(holder, "/config/*");
+ server.setHandler(servletHandler);
+ server.start();
+ }
+
+ public void storeJobConf(JobProfile jobProfile) {
+ // store job conf to bdb
+ jobManager.submitJobProfile(jobProfile);
+ }
+
+ public void storeAgentConf(String confJsonStr) {
+ // store agent conf to local file
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ conf.loadJsonStrResource(confJsonStr);
+ conf.flushToLocalPropertiesFile();
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (this.server != null) {
+ this.server.stop();
+ }
+ } catch (Exception ex) {
+ LOGGER.error("exception caught", ex);
+ }
+ }
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigServlet.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigServlet.java
new file mode 100644
index 0000000..94fd593
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ConfigServlet.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.conf;
+
+import com.google.gson.Gson;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.io.IOUtils;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigServlet extends HttpServlet {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigServlet.class);
+
+ private static final String CONTENT_TYPE = "application/json";
+ private static final String CHARSET_TYPE = "UTF-8";
+ private final Gson gson = new Gson();
+ private final ConfigJetty configCenter;
+
+ public ConfigServlet(ConfigJetty configCenter) {
+ this.configCenter = configCenter;
+ }
+
+ public void responseToJson(HttpServletResponse response,
+ ResponseResult result) throws IOException {
+ response.setContentType(CONTENT_TYPE);
+ response.setCharacterEncoding(CHARSET_TYPE);
+ String jsonStr = gson.toJson(result);
+ PrintWriter out = response.getWriter();
+ out.print(jsonStr);
+ out.flush();
+ }
+
+ /**
+ * handle post requests.
+ *
+ * @param req - request
+ * @param resp - response
+ */
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ String pathInfo = req.getPathInfo();
+ ResponseResult responseResult = new ResponseResult(0, "");
+
+ try (BufferedReader reader = req.getReader()) {
+ String configJsonStr = IOUtils.toString(reader);
+ if (pathInfo.endsWith("job")) {
+ JobProfile jobConfiguration = JobProfile.parseJsonStr(configJsonStr);
+ configCenter.storeJobConf(jobConfiguration);
+ } else if (pathInfo.endsWith("agent")) {
+ // TODO add new agent configuration
+ configCenter.storeAgentConf(configJsonStr);
+ } else {
+ responseResult.setCode(-1).setMessage("child path is not correct");
+ }
+ } catch (Exception ex) {
+ LOGGER.error("error while handle post", ex);
+ responseResult.setCode(-1).setMessage(ex.getMessage());
+ }
+ responseToJson(resp, responseResult);
+ }
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ResponseResult.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ResponseResult.java
new file mode 100644
index 0000000..7ca9e20
--- /dev/null
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/conf/ResponseResult.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.conf;
+
+/**
+ * response json for http requests.
+ */
+public class ResponseResult {
+ private int code;
+ private String message;
+
+ public int getCode() {
+ return code;
+ }
+
+ public ResponseResult(int code, String message) {
+ this.code = code;
+ this.message = message;
+ }
+
+ public ResponseResult setCode(int code) {
+ this.code = code;
+ return this;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public ResponseResult setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+}