You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/05/24 06:36:28 UTC

[apisix-java-plugin-runner] branch main updated: feat: complete the filter and response body (#7)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git


The following commit(s) were added to refs/heads/main by this push:
     new a288d21  feat: complete the filter and response body (#7)
a288d21 is described below

commit a288d2147ff43aa6b8852209e8629a3a88198779
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Mon May 24 14:36:18 2021 +0800

    feat: complete the filter and response body (#7)
---
 .../plugin/runner/handler/A6ConfigHandler.java     |  36 ++++-
 .../runner/handler/A6HandlerConfiguration.java     |  40 ++---
 .../plugin/runner/handler/A6HttpCallHandler.java   |  26 ++--
 .../plugin/runner/service/CacheConfiguration.java  |   8 +-
 .../plugin/runner/{A6Config.java => A6Conf.java}   |  16 +-
 .../apache/apisix/plugin/runner/HttpRequest.java   |  96 +++++++++---
 .../apache/apisix/plugin/runner/HttpResponse.java  | 169 +++++++++++++--------
 .../apisix/plugin/runner/filter/FilterChain.java   |  41 -----
 .../filter/{FilterBean.java => PluginFilter.java}  |   9 +-
 .../plugin/runner/filter/PluginFilterChain.java    |  58 +++++++
 10 files changed, 329 insertions(+), 170 deletions(-)

diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java
index 65e57c5..7022363 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandler.java
@@ -19,23 +19,55 @@ package org.apache.apisix.plugin.runner.handler;
 
 import com.google.common.cache.Cache;
 import io.github.api7.A6.PrepareConf.Req;
+import io.github.api7.A6.TextEntry;
 import lombok.RequiredArgsConstructor;
+import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
 import org.apache.apisix.plugin.runner.A6Request;
 import org.apache.apisix.plugin.runner.A6Response;
+import org.apache.apisix.plugin.runner.filter.PluginFilter;
+import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * Handle APISIX configuration request.
  */
 @RequiredArgsConstructor
 public class A6ConfigHandler implements Handler {
-    private final Cache<Long, Req> cache;
+    private final Logger logger = LoggerFactory.getLogger(A6ConfigHandler.class);
+
+    private final Cache<Long, A6Conf> cache;
+    private final Map<String, PluginFilter> filters;
 
     @Override
     public void handle(A6Request request, A6Response response) {
         Req req = ((A6ConfigRequest) request).getReq();
         long token = ((A6ConfigResponse) response).getConfToken();
-        cache.put(token, req);
+        PluginFilterChain chain = createFilterChain(req);
+        A6Conf config = new A6Conf(req, chain);
+        cache.put(token, config);
+
     }
+
+    private PluginFilterChain createFilterChain(Req req) {
+        List<PluginFilter> chainFilters = new ArrayList<>();
+        for (int i = 0; i < req.confLength(); i++) {
+            TextEntry conf = req.conf(i);
+            PluginFilter filter = filters.get(conf.name());
+            if (Objects.isNull(filter)) {
+                logger.error("receive undefined filter: {}, skip it", conf.name());
+                continue;
+            }
+            chainFilters.add(filter);
+        }
+        return new PluginFilterChain(chainFilters);
+    }
+
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java
index c4af130..9497e43 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HandlerConfiguration.java
@@ -19,22 +19,23 @@ package org.apache.apisix.plugin.runner.handler;
 
 import com.google.common.cache.Cache;
 import io.github.api7.A6.Err.Code;
-import io.github.api7.A6.PrepareConf.Req;
+import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
 import org.apache.apisix.plugin.runner.A6ErrRequest;
 import org.apache.apisix.plugin.runner.A6ErrResponse;
 import org.apache.apisix.plugin.runner.A6Response;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
-import org.apache.apisix.plugin.runner.filter.FilterBean;
-import org.apache.apisix.plugin.runner.filter.FilterChain;
+import org.apache.apisix.plugin.runner.filter.PluginFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -43,35 +44,18 @@ public class A6HandlerConfiguration {
     private final Logger logger = LoggerFactory.getLogger(A6HandlerConfiguration.class);
 
     @Bean
-    public A6ConfigHandler createConfigHandler(Cache<Long, Req> cache) {
-        return new A6ConfigHandler(cache);
-    }
-
-    @Bean
-    public A6HttpCallHandler createHttpHandler(ObjectProvider<FilterBean> beanProvider, Cache<Long, Req> cache) {
-        List<FilterBean> filterList = beanProvider.orderedStream().collect(Collectors.toList());
-        FilterChain chain = null;
-        if (!filterList.isEmpty()) {
-            for (int i = filterList.size() - 1; i >= 0; i--) {
-                chain = new FilterChain(filterList.get(i), chain);
-            }
+    public A6ConfigHandler createConfigHandler(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+        List<PluginFilter> pluginFilterList = beanProvider.orderedStream().collect(Collectors.toList());
+        Map<String, PluginFilter> filterMap = new HashMap<>();
+        for (PluginFilter filter : pluginFilterList) {
+            filterMap.put(filter.getClass().getSimpleName(), filter);
         }
-        return new A6HttpCallHandler(cache, chain);
+        return new A6ConfigHandler(cache, filterMap);
     }
 
     @Bean
-    public FilterBean testFilter() {
-        return new FilterBean() {
-            @Override
-            public void doFilter(HttpRequest request, HttpResponse response, FilterChain chain) {
-
-            }
-
-            @Override
-            public int getOrder() {
-                return 0;
-            }
-        };
+    public A6HttpCallHandler createHttpHandler(Cache<Long, A6Conf> cache) {
+        return new A6HttpCallHandler(cache);
     }
 
     @Bean
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java
index b1b5af7..8cb6523 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandler.java
@@ -19,21 +19,24 @@ package org.apache.apisix.plugin.runner.handler;
 
 import com.google.common.cache.Cache;
 import io.github.api7.A6.Err.Code;
-import io.github.api7.A6.PrepareConf.Req;
 import lombok.RequiredArgsConstructor;
-import org.apache.apisix.plugin.runner.A6Config;
+import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ErrResponse;
 import org.apache.apisix.plugin.runner.A6Request;
 import org.apache.apisix.plugin.runner.A6Response;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
-import org.apache.apisix.plugin.runner.filter.FilterChain;
+import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
 
 @RequiredArgsConstructor
 public class A6HttpCallHandler implements Handler {
-    private final Cache<Long, Req> cache;
+    private final Logger logger = LoggerFactory.getLogger(A6HttpCallHandler.class);
 
-    private final FilterChain chain;
+    private final Cache<Long, A6Conf> cache;
 
     @Override
     public void handle(A6Request request, A6Response response) {
@@ -41,16 +44,17 @@ public class A6HttpCallHandler implements Handler {
         HttpResponse rsp = (HttpResponse) response;
 
         long confToken = ((HttpRequest) request).getConfToken();
-        io.github.api7.A6.PrepareConf.Req conf = cache.getIfPresent(confToken);
-        if (null == conf) {
+        A6Conf conf = cache.getIfPresent(confToken);
+        if (Objects.isNull(conf)) {
+            logger.error("cannot find conf-token: {}", confToken);
             A6ErrResponse errResponse = new A6ErrResponse(Code.CONF_TOKEN_NOT_FOUND);
             rsp.setErrResponse(errResponse);
             return;
         }
 
-        A6Config config = new A6Config(conf);
-        req.setConfig(config);
-        chain.doFilter(req, rsp);
-
+        req.initCtx(rsp, conf.getReq());
+        PluginFilterChain chain = conf.getChain();
+        chain.filter(req, rsp);
     }
+
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
index ff3d166..6b6e3a5 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
@@ -19,7 +19,7 @@ package org.apache.apisix.plugin.runner.service;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import io.github.api7.A6.PrepareConf.Req;
+import org.apache.apisix.plugin.runner.A6Conf;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -28,10 +28,10 @@ import java.time.Duration;
 
 @Configuration
 public class CacheConfiguration {
-    
+
     @Bean
-    public Cache<Long, Req> configurationCache(@Value("${cache.config.expired:5000}") long expired,
-                                               @Value("${cache.config.capacity:500}") int capacity) {
+    public Cache<Long, A6Conf> configurationCache(@Value("${cache.config.expired:5000}") long expired,
+                                                  @Value("${cache.config.capacity:500}") int capacity) {
         return CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMillis(expired)).maximumSize(capacity).build();
     }
 }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Conf.java
similarity index 78%
rename from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
rename to runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Conf.java
index 583b76f..42cc4de 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Conf.java
@@ -19,13 +19,25 @@ package org.apache.apisix.plugin.runner;
 
 import io.github.api7.A6.PrepareConf.Req;
 import io.github.api7.A6.TextEntry;
+import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
 
-public class A6Config {
+public class A6Conf {
+
+    public Req getReq() {
+        return req;
+    }
 
     private final Req req;
 
-    public A6Config(Req req) {
+    public PluginFilterChain getChain() {
+        return chain;
+    }
+
+    private final PluginFilterChain chain;
+
+    public A6Conf(Req req, PluginFilterChain chain) {
         this.req = req;
+        this.chain = chain;
     }
 
     public String get(String key) {
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
index b438ecc..06440ef 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
@@ -18,18 +18,24 @@
 package org.apache.apisix.plugin.runner;
 
 import io.github.api7.A6.HTTPReqCall.Req;
-import lombok.Setter;
+import io.github.api7.A6.TextEntry;
+import org.apache.apisix.plugin.runner.filter.PluginFilter;
 
 import java.nio.ByteBuffer;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
-// @Readable
 public class HttpRequest implements A6Request {
 
     private final Req req;
 
-    private int id;
+    private HttpResponse response;
+
+    private io.github.api7.A6.PrepareConf.Req config;
+
+    private Long requestId;
 
     private String sourceIP;
 
@@ -37,39 +43,90 @@ public class HttpRequest implements A6Request {
 
     private String path;
 
-    private Map<String, String> parameter;
-
     private Map<String, String> headers;
 
-    private long confToken;
-
-    @Setter
-    private A6Config config;
-
-    private Map<String, Object> data;
+    private Map<String, String> args;
 
     public HttpRequest(Req req) {
         this.req = req;
     }
 
+    public String getConfig(PluginFilter filter) {
+        for (int i = 0; i < config.confLength(); i++) {
+            TextEntry conf = config.conf(i);
+            if (conf.name().equals(filter.getClass().getSimpleName())) {
+                return conf.value();
+            }
+        }
+        return null;
+    }
+
     public long getRequestId() {
-        return req.id();
+        if (Objects.isNull(requestId)) {
+            requestId = req.id();
+
+        }
+        return requestId;
     }
 
     public String getSourceIP() {
-        return ""; // TODO
+        if (Objects.isNull(sourceIP)) {
+            StringBuilder builder = new StringBuilder();
+            for (int i = 0; i < req.srcIpLength(); i++) {
+                builder.append(req.srcIp(i)).append('.');
+            }
+            sourceIP = builder.substring(0, builder.length() - 1);
+        }
+
+        return sourceIP;
     }
 
     public Method getMethod() {
-        return Method.values()[req.method()];
+        if (Objects.isNull(method)) {
+            method = Method.values()[req.method()];
+        }
+        return method;
     }
 
     public String getPath() {
-        return req.path(); // FiXME
+        if (Objects.isNull(path)) {
+            path = req.path();
+        }
+        return path;
     }
 
-    public String getParameter(String name) {
-        return parameter.get(name);
+    public void setPath(String path) {
+        response.setPath(path);
+    }
+
+    public Map<String, String> getHeaders() {
+        if (Objects.isNull(headers)) {
+            headers = new HashMap<>();
+            for (int i = 0; i < req.headersLength(); i++) {
+                TextEntry header = req.headers(i);
+                headers.put(header.name(), header.value());
+            }
+        }
+        return headers;
+    }
+
+    public void setHeader(String headerKey, String headerValue) {
+        response.setReqHeader(headerKey, headerValue);
+    }
+
+    public Map<String, String> getArgs() {
+        if (Objects.isNull(args)) {
+            args = new HashMap<>();
+            for (int i = 0; i < req.argsLength(); i++) {
+                TextEntry arg = req.args(i);
+                args.put(arg.name(), arg.value());
+            }
+        }
+        return args;
+    }
+
+    public void setArg(String argKey, String argValue) {
+        response.setArgs(argKey, argValue);
     }
 
     public Map getParameterMap() {
@@ -93,6 +150,11 @@ public class HttpRequest implements A6Request {
         return new HttpRequest(req);
     }
 
+    public void initCtx(HttpResponse response, io.github.api7.A6.PrepareConf.Req config) {
+        this.response = response;
+        this.config = config;
+    }
+
     @Override
     public byte getType() {
         return 2;
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
index ffcdf21..f2be0d7 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
@@ -24,7 +24,6 @@ import io.github.api7.A6.HTTPReqCall.Rewrite;
 import io.github.api7.A6.HTTPReqCall.Stop;
 import io.github.api7.A6.TextEntry;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import lombok.Data;
 import org.springframework.util.CollectionUtils;
 
 import java.nio.ByteBuffer;
@@ -38,14 +37,15 @@ import java.util.Objects;
  * action:Action;
  * }
  */
-@Data
 public class HttpResponse implements A6Response {
 
     private final long requestId;
 
     private ActionType actionType;
 
-    private Map<String, String> headers;
+    private Map<String, String> reqHeaders;
+
+    private Map<String, String> respHeaders;
 
     private Map<String, String> args;
 
@@ -58,17 +58,23 @@ public class HttpResponse implements A6Response {
     private A6ErrResponse errResponse;
 
     public HttpResponse(long requestId) {
-        this.requestId = getRequestId();
+        this.requestId = requestId;
+    }
+
+    public long getRequestId() {
+        return requestId;
     }
 
-    public void addHeader(String headerKey, String headerValue) {
-        if (Objects.isNull(headers)) {
-            headers = new HashMap<>();
+    public void setReqHeader(String headerKey, String headerValue) {
+        actionType = ActionType.Rewrite;
+        if (Objects.isNull(reqHeaders)) {
+            reqHeaders = new HashMap<>();
         }
-        headers.put(headerKey, headerValue);
+        reqHeaders.put(headerKey, headerValue);
     }
 
-    public void addArg(String argKey, String argValue) {
+    public void setArgs(String argKey, String argValue) {
+        actionType = ActionType.Rewrite;
         if (Objects.isNull(args)) {
             args = new HashMap<>();
         }
@@ -76,104 +82,145 @@ public class HttpResponse implements A6Response {
     }
 
     public void setPath(String path) {
+        actionType = ActionType.Rewrite;
         this.path = path;
     }
 
+    public void setRespHeaders(String headerKey, String headerValue) {
+        actionType = ActionType.Stop;
+        if (Objects.isNull(respHeaders)) {
+            respHeaders = new HashMap<>();
+        }
+        respHeaders.put(headerKey, headerValue);
+    }
+
+    public void setBody(String bodyKey, String bodyValue) {
+        actionType = ActionType.Stop;
+        if (Objects.isNull(body)) {
+            body = new HashMap<>();
+        }
+        body.put(bodyKey, bodyValue);
+    }
+
+    public void setStatus(HttpResponseStatus status) {
+        actionType = ActionType.Stop;
+        this.status = status;
+    }
+
+    public void setErrResponse(A6ErrResponse errResponse) {
+        this.errResponse = errResponse;
+    }
+
     @Override
     public ByteBuffer encode() {
-
-        if (null != getErrResponse()) {
-            return getErrResponse().encode();
+        if (null != errResponse) {
+            return errResponse.encode();
         }
 
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
-        if (null == getActionType()) {
-            setActionType(A6Response.ActionType.NONE);
+        if (Objects.isNull(actionType)) {
+            actionType = A6Response.ActionType.NONE;
         }
 
         int action = 0;
-        if (getActionType() == A6Response.ActionType.Rewrite) {
-            action = buildRewriteResp(builder);
-        }
 
-        if (getActionType() == A6Response.ActionType.Stop) {
+        if (actionType == A6Response.ActionType.Rewrite) {
+            action = buildRewriteResp(builder);
+        } else if (actionType == A6Response.ActionType.Stop) {
             action = buildStopResp(builder);
         }
 
         Resp.startResp(builder);
         Resp.addAction(builder, action);
-        if (null != getActionType()) {
-            Resp.addActionType(builder, getActionType().getType());
-        }
-
+        Resp.addActionType(builder, actionType.getType());
         Resp.addId(builder, getRequestId());
         builder.finish(Resp.endResp(builder));
         return builder.dataBuffer();
     }
 
     private int buildStopResp(FlatBufferBuilder builder) {
-        Stop.startStop(builder);
-        addHeaders(builder);
-        addBody(builder);
-        Stop.addStatus(builder, getStatus().code());
-        return Stop.endStop(builder);
-    }
+        int headerIndex = -1;
+        if (!CollectionUtils.isEmpty(respHeaders)) {
+            int[] headerTexts = new int[respHeaders.size()];
+            for (Map.Entry<String, String> header : respHeaders.entrySet()) {
+                int i = -1;
+                int key = builder.createString(header.getKey());
+                int value = builder.createString(header.getValue());
+                int text = TextEntry.createTextEntry(builder, key, value);
+                headerTexts[++i] = text;
+            }
+            headerIndex = Stop.createHeadersVector(builder, headerTexts);
+        }
 
-    private void addBody(FlatBufferBuilder builder) {
-        if (!CollectionUtils.isEmpty(getBody())) {
-            byte[] bodyTexts = new byte[getBody().size()];
-            for (Map.Entry<String, String> arg : getBody().entrySet()) {
+        int bodyIndex = -1;
+        if (!CollectionUtils.isEmpty(body)) {
+            byte[] bodyTexts = new byte[body.size()];
+            for (Map.Entry<String, String> arg : body.entrySet()) {
                 int i = -1;
                 int key = builder.createString(arg.getKey());
                 int value = builder.createString(arg.getValue());
                 int text = DataEntry.createDataEntry(builder, key, value);
                 bodyTexts[++i] = (byte) text;
             }
-            int body = Stop.createBodyVector(builder, bodyTexts);
-            Stop.addBody(builder, body);
+            bodyIndex = Stop.createBodyVector(builder, bodyTexts);
+        }
+
+        Stop.startStop(builder);
+        if (!Objects.isNull(status)) {
+            Stop.addStatus(builder, status.code());
         }
+        if (-1 != headerIndex) {
+            Stop.addHeaders(builder, headerIndex);
+        }
+        if (-1 != bodyIndex) {
+            Stop.addBody(builder, bodyIndex);
+        }
+        return Stop.endStop(builder);
     }
 
     private int buildRewriteResp(FlatBufferBuilder builder) {
-        Rewrite.startRewrite(builder);
-        if (null != getPath()) {
-            int path = builder.createString(getPath());
-            Rewrite.addPath(builder, path);
+        int pathIndex = -1;
+        if (Objects.isNull(path)) {
+            pathIndex = builder.createString(path);
         }
-        addHeaders(builder);
-        addArgs(builder);
-        return Rewrite.endRewrite(builder);
-    }
 
-    private void addArgs(FlatBufferBuilder builder) {
-        if (!CollectionUtils.isEmpty(getArgs())) {
-            int[] argTexts = new int[getArgs().size()];
-            for (Map.Entry<String, String> arg : getArgs().entrySet()) {
+        int headerIndex = -1;
+        if (!CollectionUtils.isEmpty(reqHeaders)) {
+            int[] headerTexts = new int[reqHeaders.size()];
+            for (Map.Entry<String, String> header : reqHeaders.entrySet()) {
                 int i = -1;
-                int key = builder.createString(arg.getKey());
-                int value = builder.createString(arg.getValue());
+                int key = builder.createString(header.getKey());
+                int value = builder.createString(header.getValue());
                 int text = TextEntry.createTextEntry(builder, key, value);
-                argTexts[++i] = text;
+                headerTexts[++i] = text;
             }
-            int args = Rewrite.createArgsVector(builder, argTexts);
-            Rewrite.addArgs(builder, args);
+            headerIndex = Rewrite.createHeadersVector(builder, headerTexts);
         }
-    }
 
-    private void addHeaders(FlatBufferBuilder builder) {
-        if (!CollectionUtils.isEmpty(getHeaders())) {
-            int[] headerTexts = new int[getHeaders().size()];
-            for (Map.Entry<String, String> header : getHeaders().entrySet()) {
+        int argsIndex = -1;
+        if (!CollectionUtils.isEmpty(args)) {
+            int[] argTexts = new int[args.size()];
+            for (Map.Entry<String, String> arg : args.entrySet()) {
                 int i = -1;
-                int key = builder.createString(header.getKey());
-                int value = builder.createString(header.getValue());
+                int key = builder.createString(arg.getKey());
+                int value = builder.createString(arg.getValue());
                 int text = TextEntry.createTextEntry(builder, key, value);
-                headerTexts[++i] = text;
+                argTexts[++i] = text;
             }
-            int headers = Rewrite.createHeadersVector(builder, headerTexts);
-            Rewrite.addHeaders(builder, headers);
+            argsIndex = Rewrite.createArgsVector(builder, argTexts);
+        }
+        Rewrite.startRewrite(builder);
+        if (-1 != pathIndex) {
+            Rewrite.addPath(builder, pathIndex);
         }
+        if (-1 != headerIndex) {
+            Rewrite.addHeaders(builder, headerIndex);
+        }
+        if (-1 != argsIndex) {
+            Rewrite.addArgs(builder, argsIndex);
+        }
+        return Rewrite.endRewrite(builder);
     }
 
     @Override
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/FilterChain.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/FilterChain.java
deleted file mode 100644
index f1d18f9..0000000
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/FilterChain.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.filter;
-
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.HttpResponse;
-
-import java.util.Objects;
-
-public class FilterChain {
-    private final FilterBean filter;
-    
-    private final FilterChain next;
-    
-    public FilterChain(FilterBean filter, FilterChain next) {
-        this.filter = filter;
-        this.next = next;
-    }
-    
-    public void doFilter(HttpRequest request, HttpResponse response) {
-        if (Objects.isNull(next)) {
-            return;
-        }
-        filter.doFilter(request, response, next);
-    }
-}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/FilterBean.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
similarity index 84%
rename from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/FilterBean.java
rename to runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
index 08244ac..a8a9b20 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/FilterBean.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilter.java
@@ -20,9 +20,10 @@ package org.apache.apisix.plugin.runner.filter;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
 import org.springframework.core.Ordered;
+import reactor.core.publisher.Mono;
+
+public interface PluginFilter extends Ordered {
+
+    Mono<Void> filter(HttpRequest request, HttpResponse response, PluginFilterChain chain);
 
-public interface FilterBean extends Ordered {
-    
-    void doFilter(HttpRequest request, HttpResponse response, FilterChain filterChain);
-    
 }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java
new file mode 100644
index 0000000..e1a8960
--- /dev/null
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/filter/PluginFilterChain.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.filter;
+
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.HttpResponse;
+import org.springframework.core.OrderComparator;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+
+public class PluginFilterChain {
+    private final int index;
+
+    private final List<PluginFilter> filters;
+
+    public PluginFilterChain(List<PluginFilter> filters) {
+        // sort filters in order
+        OrderComparator.sort(filters);
+        this.filters = filters;
+        this.index = 0;
+    }
+
+    public PluginFilterChain(PluginFilterChain parent, int index) {
+        this.filters = parent.getFilters();
+        this.index = index;
+    }
+
+    public List<PluginFilter> getFilters() {
+        return filters;
+    }
+
+    public Mono<Void> filter(HttpRequest request, HttpResponse response) {
+        if (this.index < filters.size()) {
+            PluginFilter filter = filters.get(this.index);
+            PluginFilterChain next = new PluginFilterChain(this,
+                    this.index + 1);
+            return filter.filter(request, response, next);
+        } else {
+            return Mono.empty();
+        }
+    }
+}