You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by tz...@apache.org on 2022/10/09 09:00:48 UTC

[apisix-java-plugin-runner] branch main updated: feat: support for getting upstream response body (#200)

This is an automated email from the ASF dual-hosted git repository.

tzssangglass pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git


The following commit(s) were added to refs/heads/main by this push:
     new d5421c2  feat: support for getting upstream response body (#200)
d5421c2 is described below

commit d5421c292ef0d0ea93fb484afef668d7e2fc8767
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Sun Oct 9 17:00:42 2022 +0800

    feat: support for getting upstream response body (#200)
---
 ci/apisix/config.yaml                              |  33 +++---
 ci/docker-compose.yml                              |   1 -
 docs/en/latest/development.md                      |   1 +
 docs/en/latest/installation-guide.md               |   2 +-
 .../plugin/runner/handler/RpcCallHandler.java      | 130 ++++++++++++++-------
 runner-plugin-sdk/pom.xml                          |   2 +-
 .../apisix/plugin/runner/ExtraInfoRequest.java     |  12 +-
 .../apache/apisix/plugin/runner/PostRequest.java   |  24 ++++
 .../apisix/plugin/runner/filter/PluginFilter.java  |   9 ++
 .../runner/filter/PostReqWithVarsFilter.java       |  75 ++++++++++++
 tests/e2e/plugins/plugins_post_with_vars_test.go   |  78 +++++++++++++
 11 files changed, 302 insertions(+), 65 deletions(-)

diff --git a/ci/apisix/config.yaml b/ci/apisix/config.yaml
index 9f480de..a956ec2 100644
--- a/ci/apisix/config.yaml
+++ b/ci/apisix/config.yaml
@@ -15,23 +15,22 @@
 # limitations under the License.
 #
 
-
-apisix:
-  allow_admin:
-    - 0.0.0.0/0
-  enable_control: true
-  control:
-    ip: "0.0.0.0"
-    port: 9092
-  admin_key:
-    - name: admin
-      key: edd1c9f034335f136f87ad84b625c8f1
-      role: admin
-etcd:
-   host:
-     - http://etcd:2379
-   prefix: "/apisix"
-   timeout: 30
+deployment:
+  role: traditional
+  role_traditional:
+    config_provider: etcd
+  admin:
+    admin_key:
+      - name: admin
+        key: edd1c9f034335f136f87ad84b625c8f1  # using fixed API token has security risk, please update it when you deploy to production environment
+        role: admin
+    allow_admin:
+      - 0.0.0.0/0
+  etcd:
+    host:
+      - "http://etcd:2379"
+    prefix: "/apisix"
+    timeout: 30
 ext-plugin:
   path_for_test: /tmp/runner.sock
 nginx_config:
diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml
index b0da709..ddbbc63 100644
--- a/ci/docker-compose.yml
+++ b/ci/docker-compose.yml
@@ -31,7 +31,6 @@ services:
       - "9180:9180/tcp"
       - "9091:9091/tcp"
       - "9443:9443/tcp"
-      - "9092:9092/tcp"
     networks:
       apisix:
 
diff --git a/docs/en/latest/development.md b/docs/en/latest/development.md
index 567701d..91800fb 100644
--- a/docs/en/latest/development.md
+++ b/docs/en/latest/development.md
@@ -159,6 +159,7 @@ and you can also set the `PostResponse` to override the origin upstream response
 * request.getConfig()
 * request.getUpstreamHeaders()
 * request.getUpstreamStatusCode()
+* request.getBody()
 
 ##### PostResponse
 
diff --git a/docs/en/latest/installation-guide.md b/docs/en/latest/installation-guide.md
index 1822b72..5a4b58c 100644
--- a/docs/en/latest/installation-guide.md
+++ b/docs/en/latest/installation-guide.md
@@ -34,7 +34,7 @@ Prerequisites
 -------------
 
 * JDK 11
-* APISIX 2.15.0
+* APISIX master branch
 * Refer to [Debug](how-it-works.md#debug)  to build the debug environment.
 
 Install
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java
index 94ebcd2..ed5ff04 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java
@@ -57,6 +57,7 @@ public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> {
     private final Logger logger = LoggerFactory.getLogger(RpcCallHandler.class);
 
     private final static String EXTRA_INFO_REQ_BODY_KEY = "request_body";
+    private final static String EXTRA_INFO_RESP_BODY_KEY = "response_body";
 
     private final Cache<Long, A6Conf> cache;
 
@@ -106,6 +107,62 @@ public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> {
         }
     }
 
+    private Boolean[] fetchExtraInfo(ChannelHandlerContext ctx, PluginFilterChain chain) {
+        // fetch the nginx variables
+        Set<String> varKeys = new HashSet<>();
+        boolean requiredReqBody = false;
+        boolean requiredVars = false;
+        boolean requiredRespBody = false;
+
+        for (PluginFilter filter : chain.getFilters()) {
+            Collection<String> vars = filter.requiredVars();
+            if (!CollectionUtils.isEmpty(vars)) {
+                varKeys.addAll(vars);
+                requiredVars = true;
+            }
+
+            if (filter.requiredBody() != null && filter.requiredBody()) {
+                requiredReqBody = true;
+            }
+
+            if (filter.requiredRespBody() != null && filter.requiredRespBody()) {
+                requiredRespBody = true;
+            }
+        }
+
+        // fetch the nginx vars
+        if (requiredVars) {
+            for (String varKey : varKeys) {
+                boolean offer = queue.offer(varKey);
+                if (!offer) {
+                    logger.error("queue is full");
+                    errorHandle(ctx, Code.SERVICE_UNAVAILABLE);
+                }
+                ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(varKey, null, null);
+                ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
+                future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            }
+        }
+
+        // fetch the request body
+        if (requiredReqBody) {
+            queue.offer(EXTRA_INFO_REQ_BODY_KEY);
+            ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(null, true, null);
+            ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
+            future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        }
+
+        // fetch the response body
+        if (requiredRespBody) {
+            queue.offer(EXTRA_INFO_RESP_BODY_KEY);
+            ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(null, null, true);
+            ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
+            future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        }
+
+        return new Boolean[]{requiredVars, requiredReqBody, requiredRespBody};
+    }
+
     private void handleHttpRespCall(ChannelHandlerContext ctx, PostRequest request) {
         cleanCtx();
 
@@ -129,7 +186,14 @@ public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> {
             return;
         }
 
-        doPostFilter(ctx);
+        Boolean[] result = fetchExtraInfo(ctx, chain);
+        if (Objects.isNull(result)) {
+            return;
+        }
+        if (!result[0] && !result[2]) {
+            // no need to fetch extra info
+            doPostFilter(ctx);
+        }
     }
 
     private void doPostFilter(ChannelHandlerContext ctx) {
@@ -141,6 +205,7 @@ public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> {
         }
 
         postReq.initCtx(conf.getConfig());
+        postReq.setVars(nginxVars);
 
         PluginFilterChain chain = conf.getChain();
         chain.postFilter(postReq, postResp);
@@ -159,12 +224,24 @@ public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> {
         }
 
         if (EXTRA_INFO_REQ_BODY_KEY.equals(varsKey)) {
-            currReq.setBody(result);
-        } else {
+            if (!Objects.isNull(currReq)) {
+                currReq.setBody(result);
+            }
+        } else if (EXTRA_INFO_RESP_BODY_KEY.equals(varsKey)) {
+            if (!Objects.isNull(postReq)) {
+                postReq.setBody(result);
+            }
+        }
+        else {
             nginxVars.put(varsKey, result);
         }
+
         if (queue.isEmpty()) {
-            doFilter(ctx);
+            if (currReq != null) {
+                doFilter(ctx);
+            } else if (postReq != null) {
+                doPostFilter(ctx);
+            }
         }
     }
 
@@ -215,47 +292,12 @@ public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> {
             return;
         }
 
-        // fetch the nginx variables
-        Set<String> varKeys = new HashSet<>();
-        boolean requiredBody = false;
-        boolean requiredVars = false;
-
-        for (PluginFilter filter : chain.getFilters()) {
-            Collection<String> vars = filter.requiredVars();
-            if (!CollectionUtils.isEmpty(vars)) {
-                varKeys.addAll(vars);
-                requiredVars = true;
-            }
-
-            if (filter.requiredBody() != null && filter.requiredBody()) {
-                requiredBody = true;
-            }
-        }
-
-        if (varKeys.size() > 0) {
-            for (String varKey : varKeys) {
-                boolean offer = queue.offer(varKey);
-                if (!offer) {
-                    logger.error("queue is full");
-                    errorHandle(ctx, Code.SERVICE_UNAVAILABLE);
-                    return;
-                }
-                ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(varKey, null);
-                ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
-                future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-            }
-        }
-
-        // fetch the request body
-        if (requiredBody) {
-            queue.offer(EXTRA_INFO_REQ_BODY_KEY);
-            ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(null, true);
-            ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
-            future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        Boolean[] result = fetchExtraInfo(ctx, chain);
+        if (Objects.isNull(result)) {
+            return;
         }
-
-        // no need to fetch the nginx variables or request body, just do filter
-        if (!requiredBody && !requiredVars) {
+        if (!result[0] && !result[1]) {
+            // no need to fetch extra info
             doFilter(ctx);
         }
     }
diff --git a/runner-plugin-sdk/pom.xml b/runner-plugin-sdk/pom.xml
index e72831f..065e5a7 100644
--- a/runner-plugin-sdk/pom.xml
+++ b/runner-plugin-sdk/pom.xml
@@ -36,7 +36,7 @@
         <dependency>
             <groupId>io.github.api7</groupId>
             <artifactId>A6</artifactId>
-            <version>0.5.0-RELEASE</version>
+            <version>0.6.0-RELEASE</version>
         </dependency>
 
         <dependency>
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java
index eb1954d..460b19c 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/ExtraInfoRequest.java
@@ -20,6 +20,7 @@ package org.apache.apisix.plugin.runner;
 import com.google.flatbuffers.FlatBufferBuilder;
 import io.github.api7.A6.ExtraInfo.Info;
 import io.github.api7.A6.ExtraInfo.ReqBody;
+import io.github.api7.A6.ExtraInfo.RespBody;
 import io.github.api7.A6.PrepareConf.Req;
 
 import java.nio.ByteBuffer;
@@ -30,9 +31,12 @@ public class ExtraInfoRequest implements A6Response {
 
     private final Boolean reqBody;
 
-    public ExtraInfoRequest(String var, Boolean reqBody) {
+    private final Boolean reqRespBody;
+
+    public ExtraInfoRequest(String var, Boolean reqBody, Boolean reqRespBody) {
         this.var = var;
         this.reqBody = reqBody;
+        this.reqRespBody = reqRespBody;
     }
 
     @Override
@@ -53,6 +57,12 @@ public class ExtraInfoRequest implements A6Response {
             buildExtraInfo(reqBodyReq, Info.ReqBody, builder);
         }
 
+        if (this.reqRespBody != null && this.reqRespBody) {
+            io.github.api7.A6.ExtraInfo.RespBody.startRespBody(builder);
+            int reqBodyResp = RespBody.endRespBody(builder);
+            buildExtraInfo(reqBodyResp, Info.RespBody, builder);
+        }
+
         builder.finish(Req.endReq(builder));
         return builder.dataBuffer();
     }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/PostRequest.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/PostRequest.java
index 90e94fa..ba4179a 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/PostRequest.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/PostRequest.java
@@ -20,6 +20,7 @@ package org.apache.apisix.plugin.runner;
 import io.github.api7.A6.HTTPRespCall.Req;
 import io.github.api7.A6.TextEntry;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
+import org.springframework.util.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -37,6 +38,10 @@ public class PostRequest implements A6Request {
 
     private Integer status;
 
+    private String body;
+
+    private Map<String, String> vars;
+
     public PostRequest(Req req) {
         this.req = req;
     }
@@ -87,4 +92,23 @@ public class PostRequest implements A6Request {
         }
         return status;
     }
+
+    public void setBody(String body) {
+        this.body = body;
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public String getVars(String key) {
+        if (CollectionUtils.isEmpty(vars)) {
+            return null;
+        }
+        return vars.get(key);
+    }
+
+    public void setVars(Map<String, String> vars) {
+        this.vars = vars;
+    }
 }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
index b2e7ba1..0136700 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
@@ -67,5 +67,14 @@ public interface PluginFilter {
     default Boolean requiredBody() {
         return false;
     }
+
+    /**
+     * need response body of upstream server in plugins or not
+     *
+     * @return true if need response body
+     */
+    default Boolean requiredRespBody() {
+        return false;
+    }
 }
 
diff --git a/sample/src/main/java/org/apache/apisix/plugin/runner/filter/PostReqWithVarsFilter.java b/sample/src/main/java/org/apache/apisix/plugin/runner/filter/PostReqWithVarsFilter.java
new file mode 100644
index 0000000..54ecf85
--- /dev/null
+++ b/sample/src/main/java/org/apache/apisix/plugin/runner/filter/PostReqWithVarsFilter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.filter;
+
+import com.google.gson.Gson;
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.HttpResponse;
+import org.apache.apisix.plugin.runner.PostRequest;
+import org.apache.apisix.plugin.runner.PostResponse;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class PostReqWithVarsFilter implements PluginFilter {
+    @Override
+    public String name() {
+        return "PostReqWithVarsFilter";
+    }
+
+    @Override
+    public void filter(HttpRequest request, HttpResponse response, PluginFilterChain chain) {
+        String configStr = request.getConfig(this);
+        Gson gson = new Gson();
+        Map<String, Object> conf = new HashMap<>();
+        conf = gson.fromJson(configStr, conf.getClass());
+        request.setPath((String) conf.get("rewrite_path"));
+        chain.filter(request, response);
+    }
+
+    @Override
+    public void postFilter(PostRequest request, PostResponse response, PluginFilterChain chain) {
+        String configStr = request.getConfig(this);
+        Gson gson = new Gson();
+        Map<String, Object> conf = new HashMap<>();
+        conf = gson.fromJson(configStr, conf.getClass());
+        String bodyStr = request.getBody();
+        Map<String, Object> body = new HashMap<>();
+        body = gson.fromJson(bodyStr, body.getClass());
+        assert body.get("url").toString().endsWith((String) conf.get("rewrite_path"));
+        String serverPort = request.getVars("server_port");
+        response.setHeader("server_port", serverPort);
+        chain.postFilter(request, response);
+    }
+
+    @Override
+    public List<String> requiredVars() {
+        List<String> vars = new ArrayList<>();
+        vars.add("server_port");
+        return vars;
+    }
+
+    @Override
+    public Boolean requiredRespBody() {
+        return true;
+    }
+}
diff --git a/tests/e2e/plugins/plugins_post_with_vars_test.go b/tests/e2e/plugins/plugins_post_with_vars_test.go
new file mode 100644
index 0000000..2259013
--- /dev/null
+++ b/tests/e2e/plugins/plugins_post_with_vars_test.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package plugins_test
+
+import (
+	"github.com/gavv/httpexpect/v2"
+	"github.com/onsi/ginkgo"
+	"github.com/onsi/ginkgo/extensions/table"
+	"net/http"
+
+	"github.com/apache/apisix-java-plugin-runner/tests/e2e/tools"
+)
+
+var _ = ginkgo.Describe("Post", func() {
+	table.DescribeTable("test route create and update",
+		func(tc tools.HttpTestCase) {
+			tools.RunTestCase(tc)
+		},
+		table.Entry("create java runner post plugin route success", tools.HttpTestCase{
+			Object: tools.PutA6Conf(),
+			Method: http.MethodPut,
+			Path:   "/apisix/admin/routes/1",
+			Body: `{
+				"uri":"/test/java/runner/postvars",
+				"plugins":{
+					"ext-plugin-pre-req":{
+						"conf":[
+							{
+								"name":"PostReqWithVarsFilter",
+								"value":"{\"rewrite_path\":\"/get\"}"
+							}
+						]
+					},
+					"ext-plugin-post-resp":{
+						"conf":[
+							{
+								"name":"PostReqWithVarsFilter",
+								"value":"{\"rewrite_path\":\"/get\"}"
+							}
+						]
+					}
+				},
+				"upstream":{
+					"nodes":{
+						"web:8888":1
+					},
+					"type":"roundrobin"
+				}
+			}`,
+			Headers:           map[string]string{"X-API-KEY": tools.GetAdminToken()},
+			ExpectStatusRange: httpexpect.Status2xx,
+		}),
+		table.Entry("test java runner post plugin route success", tools.HttpTestCase{
+			Object: tools.GetA6Expect(),
+			Method: http.MethodGet,
+			Path:   "/test/java/runner/postvars",
+			ExpectHeaders: map[string]string{
+				"server-port": "9080",
+			},
+			ExpectStatus: http.StatusOK,
+		}),
+	)
+})