You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/11/05 14:59:15 UTC

[GitHub] [inlong] gosonzhang opened a new pull request, #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started

gosonzhang opened a new pull request, #6413:
URL: https://github.com/apache/inlong/pull/6413

   1. I added an error SINK_SERVICE_UNREADY: if MQ cluster is not configured, or if MQ cluster is configured but initialization fails, DataProxy will not serve externally; it will only serve after initialization is successful;
   2. Adjusted the error code of HTTP access and incorporated it into the DataProxyErrCode enumeration
   
   - Fixes #6406 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #6413:
URL: https://github.com/apache/inlong/pull/6413#discussion_r1014943721


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java:
##########
@@ -49,81 +51,98 @@
     }
 
     @Override
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException {
+    public void doFilter(ServletRequest request,
+                         ServletResponse response,
+                         FilterChain chain) throws IOException {
         HttpServletRequest req = (HttpServletRequest) request;
         HttpServletResponse resp = (HttpServletResponse) response;
 
-        int code = StatusCode.SUCCESS;
-        String message = "success";
-
         String pathInfo = req.getPathInfo();
         if (pathInfo.startsWith("/")) {
             pathInfo = pathInfo.substring(1);
         }
         if ("heartbeat".equals(pathInfo)) {
-            resp.setCharacterEncoding(req.getCharacterEncoding());
-            resp.setStatus(HttpServletResponse.SC_OK);
-            resp.flushBuffer();
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SUCCESS.getErrCode(),
+                    DataProxyErrCode.SUCCESS.getErrMsg());
             return;
         }
-
-        String invalidKey = null;
+        // check sink service status
+        if (!ConfigManager.getInstance().isMqClusterReady()) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(),
+                    DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg());
+            return;
+        }
+        // get and check groupId
         String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+        if (StringUtils.isEmpty(groupId)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check streamId
         String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+        if (StringUtils.isEmpty(streamId)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check dt
         String dt = req.getParameter(AttributeConstants.DATA_TIME);
+        if (StringUtils.isEmpty(dt)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check body
         String body = req.getParameter(AttrConstants.BODY);
-
-        if (StringUtils.isEmpty(groupId)) {
-            invalidKey = "groupId";
-        } else if (StringUtils.isEmpty(streamId)) {
-            invalidKey = "streamId";
-        } else if (StringUtils.isEmpty(dt)) {
-            invalidKey = "dt";
-        } else if (StringUtils.isEmpty(body)) {
-            invalidKey = "body";
+        if (StringUtils.isEmpty(body)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg());
+            return;
+        }
+        // check body length
+        if (body.length() > maxMsgLength) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
+                    "Bad request, body length exceeds the limit:" + maxMsgLength);
+            return;
         }
-
         try {
-            if (invalidKey != null) {
-                LOG.warn("Received bad request from client. " + invalidKey + " is empty.");
-                code = StatusCode.ILLEGAL_ARGUMENT;
-                message = "Bad request from client. " + invalidKey + " must not be empty.";
-            } else if (body.length() > maxMsgLength) {
-                LOG.warn("Received bad request from client. Body length is " + body.length());
-                code = StatusCode.EXCEED_LEN;
-                message = "Bad request from client. Body length is exceeding the limit:" + maxMsgLength;
-            } else {
-                chain.doFilter(request, response);
-            }
+            chain.doFilter(request, response);
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SUCCESS.getErrCode(),
+                    DataProxyErrCode.SUCCESS.getErrMsg());
         } catch (Throwable t) {
-            code = StatusCode.SERVICE_ERR;
+            String errMsg;
             if ((t instanceof ChannelException)) {
-                message = "Channel error!";
+                errMsg = "Channel error! " + t.getMessage();
             } else {
-                message = "Service error!";
-                LOG.error("Request error!", t);
+                errMsg = "Service error! " + t.getMessage();
             }
+            LOG.error("Request error!", t);
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), errMsg);
         }
-
-        resp.setCharacterEncoding(req.getCharacterEncoding());
-        resp.setStatus(HttpServletResponse.SC_OK);
-        resp.getWriter().write(getResultContent(code, message));
-        resp.flushBuffer();
     }
 
     @Override
     public void destroy() {
     }
 
-    private String getResultContent(int code, String message) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{\"code\":\"");
-        builder.append(code);
-        builder.append("\",\"msg\":\"");
-        builder.append(message);
-        builder.append("\"}");
-
-        return builder.toString();
+    private void returnRspPackage(HttpServletResponse resp, String charEncoding,
+                                  int errCode, String errMsg) throws IOException {
+        StringBuilder builder =
+                new StringBuilder().append("{\"code\":\"").append(errCode)
+                        .append("\",\"msg\":\"").append(errMsg).append("\"}");
+        resp.setCharacterEncoding(charEncoding);
+        resp.setStatus(HttpServletResponse.SC_OK);
+        resp.getWriter().write(builder.toString());

Review Comment:
   The exception in the processing part has been caught, and only the content of the getMessage() part of the exception is taken. If there is an exception in writing to the channel, it should be no problem to throw an error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] github-code-scanning[bot] commented on a diff in pull request #6413: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #6413:
URL: https://github.com/apache/inlong/pull/6413#discussion_r1014650437


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java:
##########
@@ -49,81 +51,98 @@
     }
 
     @Override
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException {
+    public void doFilter(ServletRequest request,
+                         ServletResponse response,
+                         FilterChain chain) throws IOException {
         HttpServletRequest req = (HttpServletRequest) request;
         HttpServletResponse resp = (HttpServletResponse) response;
 
-        int code = StatusCode.SUCCESS;
-        String message = "success";
-
         String pathInfo = req.getPathInfo();
         if (pathInfo.startsWith("/")) {
             pathInfo = pathInfo.substring(1);
         }
         if ("heartbeat".equals(pathInfo)) {
-            resp.setCharacterEncoding(req.getCharacterEncoding());
-            resp.setStatus(HttpServletResponse.SC_OK);
-            resp.flushBuffer();
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SUCCESS.getErrCode(),
+                    DataProxyErrCode.SUCCESS.getErrMsg());
             return;
         }
-
-        String invalidKey = null;
+        // check sink service status
+        if (!ConfigManager.getInstance().isMqClusterReady()) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(),
+                    DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg());
+            return;
+        }
+        // get and check groupId
         String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+        if (StringUtils.isEmpty(groupId)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check streamId
         String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+        if (StringUtils.isEmpty(streamId)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check dt
         String dt = req.getParameter(AttributeConstants.DATA_TIME);
+        if (StringUtils.isEmpty(dt)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check body
         String body = req.getParameter(AttrConstants.BODY);
-
-        if (StringUtils.isEmpty(groupId)) {
-            invalidKey = "groupId";
-        } else if (StringUtils.isEmpty(streamId)) {
-            invalidKey = "streamId";
-        } else if (StringUtils.isEmpty(dt)) {
-            invalidKey = "dt";
-        } else if (StringUtils.isEmpty(body)) {
-            invalidKey = "body";
+        if (StringUtils.isEmpty(body)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg());
+            return;
+        }
+        // check body length
+        if (body.length() > maxMsgLength) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
+                    "Bad request, body length exceeds the limit:" + maxMsgLength);
+            return;
         }
-
         try {
-            if (invalidKey != null) {
-                LOG.warn("Received bad request from client. " + invalidKey + " is empty.");
-                code = StatusCode.ILLEGAL_ARGUMENT;
-                message = "Bad request from client. " + invalidKey + " must not be empty.";
-            } else if (body.length() > maxMsgLength) {
-                LOG.warn("Received bad request from client. Body length is " + body.length());
-                code = StatusCode.EXCEED_LEN;
-                message = "Bad request from client. Body length is exceeding the limit:" + maxMsgLength;
-            } else {
-                chain.doFilter(request, response);
-            }
+            chain.doFilter(request, response);
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SUCCESS.getErrCode(),
+                    DataProxyErrCode.SUCCESS.getErrMsg());
         } catch (Throwable t) {
-            code = StatusCode.SERVICE_ERR;
+            String errMsg;
             if ((t instanceof ChannelException)) {
-                message = "Channel error!";
+                errMsg = "Channel error! " + t.getMessage();
             } else {
-                message = "Service error!";
-                LOG.error("Request error!", t);
+                errMsg = "Service error! " + t.getMessage();
             }
+            LOG.error("Request error!", t);
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), errMsg);
         }
-
-        resp.setCharacterEncoding(req.getCharacterEncoding());
-        resp.setStatus(HttpServletResponse.SC_OK);
-        resp.getWriter().write(getResultContent(code, message));
-        resp.flushBuffer();
     }
 
     @Override
     public void destroy() {
     }
 
-    private String getResultContent(int code, String message) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{\"code\":\"");
-        builder.append(code);
-        builder.append("\",\"msg\":\"");
-        builder.append(message);
-        builder.append("\"}");
-
-        return builder.toString();
+    private void returnRspPackage(HttpServletResponse resp, String charEncoding,
+                                  int errCode, String errMsg) throws IOException {
+        StringBuilder builder =
+                new StringBuilder().append("{\"code\":\"").append(errCode)
+                        .append("\",\"msg\":\"").append(errMsg).append("\"}");
+        resp.setCharacterEncoding(charEncoding);
+        resp.setStatus(HttpServletResponse.SC_OK);
+        resp.getWriter().write(builder.toString());

Review Comment:
   ## Information exposure through a stack trace
   
   [Error information](1) can be exposed to an external user.
   [Error information](2) can be exposed to an external user.
   
   [Show more details](https://github.com/apache/inlong/security/code-scanning/40)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang merged pull request #6413: [INLONG-6406][DataProxy] Support creating sink dynamically after starting

Posted by GitBox <gi...@apache.org>.
gosonzhang merged PR #6413:
URL: https://github.com/apache/inlong/pull/6413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org