You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/04/29 16:44:41 UTC

[systemds] branch main updated: [SYSTEMDS-3349] Base monitoring backend structure to handle REST requests

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f60acf657 [SYSTEMDS-3349] Base monitoring backend structure to handle REST requests
8f60acf657 is described below

commit 8f60acf6570e74544e87d54678e23cf8323d5e69
Author: Mito <mk...@arakt.com>
AuthorDate: Tue Apr 12 16:32:49 2022 +0200

    [SYSTEMDS-3349] Base monitoring backend structure to handle REST requests
    
    This commit implements a simple base MVC framework, with dynamic parsing
    of REST requests, which will serve as a backend to the monitoring tool
    for coordinators and workers. Specifically, it starts a server which
    parses the uri dynamically and reroutes the request to the right
    controller (provided the controller is registered) for further
    request processing and returns the constructed response.
    
    Closes #1583
---
 .../monitoring/FederatedMonitoringServer.java      |  64 ++++++++++
 .../FederatedMonitoringServerHandler.java          | 129 +++++++++++++++++++++
 .../federated/monitoring/Request.java              |  43 +++++++
 .../federated/monitoring/Response.java             |  53 +++++++++
 .../monitoring/controllers/BaseController.java     |  36 ++++++
 .../controllers/CoordinatorController.java         |  51 ++++++++
 6 files changed, 376 insertions(+)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
new file mode 100644
index 0000000000..61bc6e5dc3
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpServerCodec;
+
+public class FederatedMonitoringServer {
+    private final int _port;
+
+    public FederatedMonitoringServer(int port) {
+        _port = (port == -1) ? 4201 : port;
+    }
+
+    public void run() throws Exception {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        try {
+            ServerBootstrap server = new ServerBootstrap();
+            server.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<>() {
+                        @Override
+                        protected void initChannel(Channel ch) {
+                            ChannelPipeline pipeline = ch.pipeline();
+
+                            pipeline.addLast(new HttpServerCodec());
+                            pipeline.addLast(new FederatedMonitoringServerHandler());
+                        }
+                    });
+
+            ChannelFuture f = server.bind(_port).sync();
+            f.channel().closeFuture().sync();
+        } finally {
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
new file mode 100644
index 0000000000..ac392b5000
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServerHandler.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.BaseController;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.CoordinatorController;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FederatedMonitoringServerHandler extends SimpleChannelInboundHandler<HttpObject> {
+
+    private final Map<String, BaseController> _allControllers = new HashMap<>();
+    {
+        _allControllers.put("/coordinators", new CoordinatorController());
+    }
+
+    private final static ThreadLocal<Request> _currentRequest = new ThreadLocal<>();
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+
+        if (msg instanceof LastHttpContent) {
+            final ByteBuf jsonBuf = ((LastHttpContent) msg).content();
+            final Request request = _currentRequest.get();
+            request.setBody(jsonBuf.toString(CharsetUtil.UTF_8));
+
+            _currentRequest.remove();
+
+            final FullHttpResponse response = processRequest(request);
+            ctx.write(response);
+
+        } else if (msg instanceof HttpRequest) {
+            final HttpRequest httpRequest = (HttpRequest) msg;
+            final Request request = new Request();
+            request.setContext(httpRequest);
+
+            _currentRequest.set(request);
+        }
+
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+        ctx.flush();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+    private FullHttpResponse processRequest(final Request request) {
+        try {
+            final BaseController controller = parseController(request.getContext().uri());
+            final String method = request.getContext().method().name();
+
+            switch (method) {
+                case "GET":
+                    final Long id = parseId(request.getContext().uri());
+
+                    if (id != null) {
+                        return controller.get(request, id);
+                    }
+
+                    return controller.getAll(request);
+                case "PUT":
+                    return controller.create(request);
+                case "POST":
+                    return controller.update(request, parseId(request.getContext().uri()));
+                case "DELETE":
+                    return controller.delete(request, parseId(request.getContext().uri()));
+                default:
+                    throw new IllegalArgumentException("Method is not supported!");
+            }
+        } catch (RuntimeException ex) {
+            ex.printStackTrace();
+            return null;
+        }
+    }
+
+    private BaseController parseController(final String currentPath) {
+        final Optional<String> controller = _allControllers.keySet().stream()
+                .filter(currentPath::startsWith)
+                .findFirst();
+
+        return controller.map(_allControllers::get).orElseThrow(() ->
+                new IllegalArgumentException("Such controller does not exist!"));
+    }
+
+    private Long parseId(final String uri) {
+        final Pattern pattern = Pattern.compile("^[/][a-z]+[/]");
+        final Matcher matcher = pattern.matcher(uri);
+
+        if (matcher.find()) {
+            return Long.valueOf(uri.substring(matcher.end()));
+        }
+        return null;
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
new file mode 100644
index 0000000000..b9d71fe428
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Request.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.handler.codec.http.HttpRequest;
+
+public class Request {
+    private HttpRequest _context;
+    private String _body;
+
+    public HttpRequest getContext() {
+        return _context;
+    }
+
+    public void setContext(final HttpRequest requestContext) {
+        this._context = requestContext;
+    }
+
+    public String getBody() {
+        return _body;
+    }
+
+    public void setBody(final String content) {
+        this._body = content;
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
new file mode 100644
index 0000000000..7a3814835b
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/Response.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
+public class Response {
+    public static FullHttpResponse ok(final String result) {
+        FullHttpResponse response = new DefaultFullHttpResponse(
+                HttpVersion.HTTP_1_1,
+                HttpResponseStatus.OK,
+                Unpooled.wrappedBuffer(result.getBytes()));
+
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
+        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+        return response;
+    }
+
+    public static FullHttpResponse notFound(final String exception) {
+        FullHttpResponse response = new DefaultFullHttpResponse(
+                HttpVersion.HTTP_1_1,
+                HttpResponseStatus.NOT_FOUND,
+                Unpooled.wrappedBuffer(exception.getBytes()));
+
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
+        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+        return response;
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
new file mode 100644
index 0000000000..34a415b6e3
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/BaseController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+
+import io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
+
+public interface BaseController {
+
+    FullHttpResponse create(final Request request);
+
+    FullHttpResponse update(final Request request, final Long objectId);
+
+    FullHttpResponse delete(final Request request, final Long objectId);
+
+    FullHttpResponse get(final Request request, final Long objectId);
+
+    FullHttpResponse getAll(final Request request);
+}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
new file mode 100644
index 0000000000..be807721b9
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/CoordinatorController.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+
+import io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
+import org.apache.sysds.runtime.controlprogram.federated.monitoring.Response;
+
+public class CoordinatorController implements BaseController {
+    @Override
+    public FullHttpResponse create(Request request) {
+        return null;
+    }
+
+    @Override
+    public FullHttpResponse update(Request request, Long objectId) {
+        return null;
+    }
+
+    @Override
+    public FullHttpResponse delete(Request request, Long objectId) {
+        return null;
+    }
+
+    @Override
+    public FullHttpResponse get(Request request, Long objectId) {
+        return Response.ok("Success");
+    }
+
+    @Override
+    public FullHttpResponse getAll(Request request) {
+        return Response.ok("Success");
+    }
+}