You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/04/29 16:44:41 UTC
[systemds] branch main updated: [SYSTEMDS-3349] Base monitoring backend structure to handle REST requests
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 8f60acf657 [SYSTEMDS-3349] Base monitoring backend structure to handle REST requests
8f60acf657 is described below
commit 8f60acf6570e74544e87d54678e23cf8323d5e69
Author: Mito <mk...@arakt.com>
AuthorDate: Tue Apr 12 16:32:49 2022 +0200
[SYSTEMDS-3349] Base monitoring backend structure to handle REST requests
This commit implements a simple base MVC framework, with dynamic parsing
of REST requests, which will serve as a backend to the monitoring tool
for coordinators and workers. Specifically, it starts a server which
parses the uri dynamically and reroutes the request to the right
controller (provided the controller is registered) for further
request processing and returns the constructed response.
Closes #1583
---
.../monitoring/FederatedMonitoringServer.java | 64 ++++++++++
.../FederatedMonitoringServerHandler.java | 129 +++++++++++++++++++++
.../federated/monitoring/Request.java | 43 +++++++
.../federated/monitoring/Response.java | 53 +++++++++
.../monitoring/controllers/BaseController.java | 36 ++++++
.../controllers/CoordinatorController.java | 51 ++++++++
6 files changed, 376 insertions(+)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
new file mode 100644
index 0000000000..61bc6e5dc3
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpServerCodec;
+
+public class FederatedMonitoringServer {
+ private final int _port;
+
+ public FederatedMonitoringServer(int port) {
+ _port = (port == -1) ? 4201 : port;
+ }
+
+ public void run() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ try {
+ ServerBootstrap server = new ServerBootstrap();
+ server.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+
+ pipeline.addLast(new HttpServerCodec());
+ pipeline.addLast(new FederatedMonitoringServerHandler());
+ }
+ });
+
+ ChannelFuture f = server.bind(_port).sync();
+ f.channel().closeFuture().sync();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
new file mode 100644
index 0000000000..ac392b5000
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.BaseController;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.CoordinatorController;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FederatedMonitoringServerHandler extends SimpleChannelInboundHandler<HttpObject> {
+
+ private final Map<String, BaseController> _allControllers = new HashMap<>();
+ {
+ _allControllers.put("/coordinators", new CoordinatorController());
+ }
+
+ private final static ThreadLocal<Request> _currentRequest = new ThreadLocal<>();
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+
+ if (msg instanceof LastHttpContent) {
+ final ByteBuf jsonBuf = ((LastHttpContent) msg).content();
+ final Request request = _currentRequest.get();
+ request.setBody(jsonBuf.toString(CharsetUtil.UTF_8));
+
+ _currentRequest.remove();
+
+ final FullHttpResponse response = processRequest(request);
+ ctx.write(response);
+
+ } else if (msg instanceof HttpRequest) {
+ final HttpRequest httpRequest = (HttpRequest) msg;
+ final Request request = new Request();
+ request.setContext(httpRequest);
+
+ _currentRequest.set(request);
+ }
+
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ private FullHttpResponse processRequest(final Request request) {
+ try {
+ final BaseController controller = parseController(request.getContext().uri());
+ final String method = request.getContext().method().name();
+
+ switch (method) {
+ case "GET":
+ final Long id = parseId(request.getContext().uri());
+
+ if (id != null) {
+ return controller.get(request, id);
+ }
+
+ return controller.getAll(request);
+ case "PUT":
+ return controller.create(request);
+ case "POST":
+ return controller.update(request, parseId(request.getContext().uri()));
+ case "DELETE":
+ return controller.delete(request, parseId(request.getContext().uri()));
+ default:
+ throw new IllegalArgumentException("Method is not supported!");
+ }
+ } catch (RuntimeException ex) {
+ ex.printStackTrace();
+ return null;
+ }
+ }
+
+ private BaseController parseController(final String currentPath) {
+ final Optional<String> controller = _allControllers.keySet().stream()
+ .filter(currentPath::startsWith)
+ .findFirst();
+
+ return controller.map(_allControllers::get).orElseThrow(() ->
+ new IllegalArgumentException("Such controller does not exist!"));
+ }
+
+ private Long parseId(final String uri) {
+ final Pattern pattern = Pattern.compile("^[/][a-z]+[/]");
+ final Matcher matcher = pattern.matcher(uri);
+
+ if (matcher.find()) {
+ return Long.valueOf(uri.substring(matcher.end()));
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
new file mode 100644
index 0000000000..b9d71fe428
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.handler.codec.http.HttpRequest;
+
+public class Request {
+ private HttpRequest _context;
+ private String _body;
+
+ public HttpRequest getContext() {
+ return _context;
+ }
+
+ public void setContext(final HttpRequest requestContext) {
+ this._context = requestContext;
+ }
+
+ public String getBody() {
+ return _body;
+ }
+
+ public void setBody(final String content) {
+ this._body = content;
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
new file mode 100644
index 0000000000..7a3814835b
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
+public class Response {
+ public static FullHttpResponse ok(final String result) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.OK,
+ Unpooled.wrappedBuffer(result.getBytes()));
+
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+ response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
+
+ public static FullHttpResponse notFound(final String exception) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.NOT_FOUND,
+ Unpooled.wrappedBuffer(exception.getBytes()));
+
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
+ response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+ return response;
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
new file mode 100644
index 0000000000..34a415b6e3
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+
+import io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
+
+public interface BaseController {
+
+ FullHttpResponse create(final Request request);
+
+ FullHttpResponse update(final Request request, final Long objectId);
+
+ FullHttpResponse delete(final Request request, final Long objectId);
+
+ FullHttpResponse get(final Request request, final Long objectId);
+
+ FullHttpResponse getAll(final Request request);
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
new file mode 100644
index 0000000000..be807721b9
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+
+import io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.Response;
+
+public class CoordinatorController implements BaseController {
+ @Override
+ public FullHttpResponse create(Request request) {
+ return null;
+ }
+
+ @Override
+ public FullHttpResponse update(Request request, Long objectId) {
+ return null;
+ }
+
+ @Override
+ public FullHttpResponse delete(Request request, Long objectId) {
+ return null;
+ }
+
+ @Override
+ public FullHttpResponse get(Request request, Long objectId) {
+ return Response.ok("Success");
+ }
+
+ @Override
+ public FullHttpResponse getAll(Request request) {
+ return Response.ok("Success");
+ }
+}