You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/05/19 09:10:04 UTC

[apisix-java-plugin-runner] branch main updated: feat: adjust protocol handling and refine logic (#3)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git


The following commit(s) were added to refs/heads/main by this push:
     new 62c2e2c  feat: adjust protocol handling and refine logic (#3)
62c2e2c is described below

commit 62c2e2c55460436589e52ebf6072639ab39085e3
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Wed May 19 17:09:58 2021 +0800

    feat: adjust protocol handling and refine logic (#3)
---
 pom.xml                                            |   5 +
 .../runner/codec/PluginRunnerConfiguration.java    |  18 +-
 .../plugin/runner/codec/frame/FrameCodec.java      |  68 ++++++++
 .../plugin/runner/codec/frame/FrameType.java       |  27 +--
 .../runner/codec/impl/FlatBuffersDecoder.java      |  45 -----
 .../runner/codec/impl/FlatBuffersEncoder.java      |  31 ----
 .../runner/handler/DefaultPayloadHandler.java      |  99 +++++++++++
 .../apisix/plugin/runner/handler/ErrHandler.java   |  46 +++---
 .../Filter.java}                                   |  11 +-
 .../plugin/runner/handler/HTTPReqCallHandler.java  | 182 +++++++++++++++++++++
 .../apisix/plugin/runner/handler/IOHandler.java    |  18 +-
 .../runner/handler/IOHandlerConfiguration.java     |   6 +-
 .../{ServerHandler.java => PayloadHandler.java}    |  17 +-
 .../plugin/runner/handler/PrepareConfHandler.java  |  69 ++++++++
 .../RequestHandler.java}                           |  16 +-
 .../runner/server/config/IOHandlerCustomizer.java  |  27 ---
 .../plugin/runner/service/CacheConfiguration.java  |   6 +-
 .../{A6HttpCallResponse.java => A6Config.java}     |  33 ++--
 .../apache/apisix/plugin/runner/A6Response.java    |  27 +--
 .../apache/apisix/plugin/runner/HttpRequest.java   |  60 +++----
 .../apache/apisix/plugin/runner/HttpResponse.java  |  36 ++--
 runner-starter/src/main/resources/application.yaml |   2 +-
 22 files changed, 588 insertions(+), 261 deletions(-)

diff --git a/pom.xml b/pom.xml
index c7beacb..0405e72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,11 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+            <classifier>linux-aarch_64</classifier>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
             <artifactId>netty-tcnative-boringssl-static</artifactId>
         </dependency>
         <dependency>
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
index ac314c1..03f26b4 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
@@ -17,22 +17,18 @@
 
 package org.apache.apisix.plugin.runner.codec;
 
-import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersDecoder;
-import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersEncoder;
+import com.google.common.cache.Cache;
+import io.github.api7.A6.PrepareConf.Req;
+import org.apache.apisix.plugin.runner.handler.DefaultPayloadHandler;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 @Configuration
 public class PluginRunnerConfiguration {
-    
-    @Bean
-    public PluginRunnerDecoder createDecoder() {
-        return new FlatBuffersDecoder();
-    }
-    
+
     @Bean
-    public PluginRunnerEncoder createEncoder() {
-        return new FlatBuffersEncoder();
+    public DefaultPayloadHandler createPayloadHandler(Cache<Long, Req> cache) {
+        return new DefaultPayloadHandler(cache);
     }
-    
+
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameCodec.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameCodec.java
new file mode 100644
index 0000000..b50e286
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameCodec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.codec.frame;
+
+import java.nio.ByteBuffer;
+
+public class FrameCodec {
+
+    public static int getDataLength(ByteBuffer payload) {
+        byte[] bytes = new byte[3];
+        for (int i = 0; i < 3; i++) {
+            bytes[i] = payload.get();
+        }
+        return byte3ToInt(bytes);
+    }
+
+    public static ByteBuffer getBody(ByteBuffer payload) {
+        int length = getDataLength(payload);
+        ByteBuffer buffer = payload.slice();
+        byte[] dst = new byte[length];
+        buffer.get(dst, 0, length);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ByteBuffer setBody(ByteBuffer payload, FrameType frameType) {
+        byte[] data = new byte[payload.remaining()];
+        payload.get(data);
+        ByteBuffer buffer = ByteBuffer.allocate(data.length + 4);
+        buffer.put(frameType.getType());
+        // data length
+        byte[] length = intToByte3(data.length);
+        buffer.put(length);
+        // data
+        buffer.put(data);
+        buffer.flip();
+        return buffer;
+    }
+
+    private static byte[] intToByte3(int i) {
+        byte[] targets = new byte[3];
+        targets[2] = (byte) (i & 0xFF);
+        targets[1] = (byte) (i >> 8 & 0xFF);
+        targets[0] = (byte) ((i >> 16 & 0xFF));
+        return targets;
+    }
+
+    private static int byte3ToInt(byte[] bytes) {
+        return bytes[2] & 0xFF |
+                (bytes[1] & 0xFF << 8) |
+                (bytes[0] & 0xFF << 16);
+    }
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameType.java
similarity index 72%
copy from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
copy to runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameType.java
index 4a15a5e..118155b 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameType.java
@@ -15,19 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner;
+package org.apache.apisix.plugin.runner.codec.frame;
 
-import java.nio.ByteBuffer;
+public enum FrameType {
+    RPC_ERROR((byte) 0),
 
-public interface A6Response {
-    
-    ByteBuffer encode();
-    
-    enum Action {
-        
-        STOP,
-        REWRITE,
-        ERR
-        
+    RPC_PREPARE_CONF((byte) 1),
+
+    RPC_HTTP_REQ_CALL((byte) 2);
+
+    private final byte type;
+
+    FrameType(byte type) {
+        this.type = type;
+    }
+
+    public byte getType() {
+        return type;
     }
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
deleted file mode 100644
index a551f50..0000000
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.codec.impl;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.apisix.plugin.runner.A6ConfigRequest;
-import org.apache.apisix.plugin.runner.A6Request;
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-
-import java.nio.ByteBuffer;
-
-public class FlatBuffersDecoder implements PluginRunnerDecoder {
-    
-    @Override
-    public A6Request decode(ByteBuf buf) {
-        final byte type = buf.readByte();
-        final short length = buf.readShort();
-        
-        ByteBuffer buffer = ByteBuffer.allocate(length);
-        buf.getBytes(length, buffer);
-        buffer.flip();
-        
-        if (type == 0) {
-            return A6ConfigRequest.from(buffer);
-        } else {
-            return HttpRequest.from(buffer);
-        }
-    }
-}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
deleted file mode 100644
index 4e04ebf..0000000
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.codec.impl;
-
-import org.apache.apisix.plugin.runner.A6Response;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
-
-import java.nio.ByteBuffer;
-
-public class FlatBuffersEncoder implements PluginRunnerEncoder {
-    
-    @Override
-    public ByteBuffer encode(A6Response response) {
-        return response.encode();
-    }
-}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/DefaultPayloadHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/DefaultPayloadHandler.java
new file mode 100644
index 0000000..90b8eed
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/DefaultPayloadHandler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.flatbuffers.FlatBufferBuilder;
+import io.github.api7.A6.Err.Code;
+import io.github.api7.A6.PrepareConf.Req;
+import lombok.RequiredArgsConstructor;
+import org.apache.apisix.plugin.runner.codec.frame.FrameCodec;
+import org.apache.apisix.plugin.runner.filter.FilterBean;
+import org.apache.apisix.plugin.runner.filter.FilterChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.ObjectProvider;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@RequiredArgsConstructor
+public class DefaultPayloadHandler implements PayloadHandler {
+
+    private final Logger logger = LoggerFactory.getLogger(DefaultPayloadHandler.class);
+
+    FlatBufferBuilder builder;
+
+    private RequestHandler handler;
+
+    private final Cache<Long, Req> cache;
+
+    public RequestHandler decode(ByteBuffer buffer) {
+        byte type = buffer.get();
+        ByteBuffer body = FrameCodec.getBody(buffer);
+
+        switch (type) {
+            case 1:
+                handler = new PrepareConfHandler(body, cache);
+                return handler;
+            case 2:
+//                FilterChain chain = createFilterChain(null);
+                handler = new HTTPReqCallHandler(body, cache, null);
+                return handler;
+            default:
+                break;
+        }
+
+        logger.error("receiving unsupport type: {}", type);
+        return error(Code.BAD_REQUEST);
+    }
+
+    private FilterChain createFilterChain(ObjectProvider<FilterBean> beanProvider) {
+        List<FilterBean> filterList = beanProvider.orderedStream().collect(Collectors.toList());
+        FilterChain chain = null;
+        if (!filterList.isEmpty()) {
+            for (int i = filterList.size() - 1; i >= 0; i--) {
+                chain = new FilterChain(filterList.get(i), chain);
+            }
+        }
+        return chain;
+    }
+
+    public ByteBuffer encode(RequestHandler handler) {
+        ByteBuffer buffer = this.handler.builder().dataBuffer();
+        return FrameCodec.setBody(buffer, this.handler.type());
+    }
+
+    @Override
+    public RequestHandler error() {
+        logger.error("process rpc call error");
+        return error(Code.SERVICE_UNAVAILABLE);
+    }
+
+    public RequestHandler dispatch(RequestHandler handler) {
+        builder = new FlatBufferBuilder();
+        builder = this.handler.handler(builder);
+        return handler;
+    }
+
+    private RequestHandler error(int code) {
+        return new ErrHandler(code);
+    }
+
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigRequest.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ErrHandler.java
similarity index 50%
rename from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigRequest.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ErrHandler.java
index 5d1b669..cd8a76a 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigRequest.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ErrHandler.java
@@ -15,33 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner;
+package org.apache.apisix.plugin.runner.handler;
 
-import io.github.api7.A6.PrepareConf.Req;
+import com.google.flatbuffers.FlatBufferBuilder;
+import io.github.api7.A6.Err.Resp;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
 
-import java.nio.ByteBuffer;
+public class ErrHandler implements RequestHandler {
+    private static final FrameType TYPE = FrameType.RPC_ERROR;
 
-public class A6ConfigRequest implements A6Request {
-    
-    private final int confToken;
-    
-    public A6ConfigRequest(int confToken) {
-        this.confToken = confToken;
+    private int code = -1;
+    private FlatBufferBuilder builder;
+
+    public ErrHandler(int code) {
+        this.code = code;
     }
-    
+
     @Override
-    public boolean isConfigRequest() {
-        return true;
+    public FlatBufferBuilder handler(FlatBufferBuilder builder) {
+        this.builder = builder;
+        Resp.startResp(builder);
+        Resp.addCode(builder, code == -1 ? 1 : code);
+        int orc = Resp.endResp(builder);
+        builder.finish(orc);
+        return builder;
     }
-    
+
     @Override
-    public int getConfToken() {
-        return confToken;
+    public FlatBufferBuilder builder() {
+        return this.builder;
     }
-    
-    public static A6ConfigRequest from(ByteBuffer buffer) {
-        // TODO request id and confToken came from client?
-        Req req = Req.getRootAsReq(buffer);
-        return new A6ConfigRequest(0);
+
+    @Override
+    public FrameType type() {
+        return TYPE;
     }
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Filter.java
similarity index 82%
rename from runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Filter.java
index fe4221b..6f79e81 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Filter.java
@@ -15,14 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner.handler;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.A6Response;
 
-@FunctionalInterface
-public interface PluginRunnerDecoder {
-    
-    A6Request decode(ByteBuf buf);
-    
+public interface Filter {
+    A6Response filter(A6Request req);
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
new file mode 100644
index 0000000..f4e3fcc
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.flatbuffers.FlatBufferBuilder;
+import com.google.flatbuffers.Table;
+import io.github.api7.A6.DataEntry;
+import io.github.api7.A6.Err.Code;
+import io.github.api7.A6.HTTPReqCall.Req;
+import io.github.api7.A6.HTTPReqCall.Resp;
+import io.github.api7.A6.HTTPReqCall.Rewrite;
+import io.github.api7.A6.HTTPReqCall.Stop;
+import io.github.api7.A6.TextEntry;
+import org.apache.apisix.plugin.runner.A6Config;
+import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.A6Response;
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.HttpResponse;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
+import org.apache.apisix.plugin.runner.filter.FilterChain;
+import org.springframework.util.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class HTTPReqCallHandler implements RequestHandler, Filter {
+
+    private static final FrameType TYPE = FrameType.RPC_HTTP_REQ_CALL;
+    private final Table req;
+    private FlatBufferBuilder builder;
+    private final Cache<Long, io.github.api7.A6.PrepareConf.Req> cache;
+    private final FilterChain chain;
+
+    public HTTPReqCallHandler(ByteBuffer body,
+                              Cache<Long, io.github.api7.A6.PrepareConf.Req> cache,
+                              FilterChain chain) {
+        this.req = Req.getRootAsReq(body);
+        this.cache = cache;
+        this.chain = chain;
+    }
+
+    @Override
+    public FlatBufferBuilder handler(FlatBufferBuilder builder) {
+        this.builder = builder;
+        Req req = (Req) this.req;
+
+        io.github.api7.A6.PrepareConf.Req conf = cache.getIfPresent(req.confToken());
+        if (null == conf) {
+            io.github.api7.A6.Err.Resp.startResp(builder);
+            io.github.api7.A6.Err.Resp.addCode(builder, Code.CONF_TOKEN_NOT_FOUND);
+            int orc = io.github.api7.A6.Err.Resp.endResp(builder);
+            builder.finish(orc);
+            return builder;
+        }
+
+        A6Config config = new A6Config(conf);
+        HttpRequest request = new HttpRequest(req, config);
+        HttpResponse response = new HttpResponse((int) req.id());
+//        chain.doFilter(request, response);
+        return buildRes(builder, response);
+    }
+
+    private FlatBufferBuilder buildRes(FlatBufferBuilder builder, HttpResponse response) {
+
+        if (null == response.getActionType()) {
+            response.setActionType(A6Response.ActionType.NONE);
+        }
+
+        int action = 0;
+        if (response.getActionType() == A6Response.ActionType.Rewrite) {
+            action = buildRewriteResp(builder, response);
+        }
+
+        if (response.getActionType() == A6Response.ActionType.Stop) {
+            action = buildStopResp(builder, response);
+        }
+
+        Resp.startResp(builder);
+        Resp.addAction(builder, action);
+        if (null != response.getActionType()) {
+            Resp.addActionType(builder, response.getActionType().getType());
+        }
+        Resp.addId(builder, response.getRequestId());
+        builder.finish(Resp.endResp(builder));
+        return builder;
+    }
+
+    private int buildStopResp(FlatBufferBuilder builder, HttpResponse response) {
+        Stop.startStop(builder);
+        addHeaders(builder, response);
+        addBody(builder, response);
+        Stop.addStatus(builder, response.getStatus().code());
+        return Stop.endStop(builder);
+    }
+
+    private void addBody(FlatBufferBuilder builder, HttpResponse response) {
+        if (!CollectionUtils.isEmpty(response.getBody())) {
+            byte[] bodyTexts = new byte[response.getBody().size()];
+            for (Map.Entry<String, String> arg : response.getBody().entrySet()) {
+                int i = -1;
+                int key = builder.createString(arg.getKey());
+                int value = builder.createString(arg.getValue());
+                int text = DataEntry.createDataEntry(builder, key, value);
+                bodyTexts[++i] = (byte) text;
+            }
+            int body = Stop.createBodyVector(builder, bodyTexts);
+            Stop.addBody(builder, body);
+        }
+    }
+
+    private int buildRewriteResp(FlatBufferBuilder builder, HttpResponse response) {
+        Rewrite.startRewrite(builder);
+        if (null != response.getPath()) {
+            int path = builder.createString(response.getPath());
+            Rewrite.addPath(builder, path);
+        }
+        addHeaders(builder, response);
+        addArgs(builder, response);
+        return Rewrite.endRewrite(builder);
+    }
+
+    private void addArgs(FlatBufferBuilder builder, HttpResponse response) {
+        if (!CollectionUtils.isEmpty(response.getArgs())) {
+            int[] argTexts = new int[response.getArgs().size()];
+            for (Map.Entry<String, String> arg : response.getArgs().entrySet()) {
+                int i = -1;
+                int key = builder.createString(arg.getKey());
+                int value = builder.createString(arg.getValue());
+                int text = TextEntry.createTextEntry(builder, key, value);
+                argTexts[++i] = text;
+            }
+            int args = Rewrite.createArgsVector(builder, argTexts);
+            Rewrite.addArgs(builder, args);
+        }
+    }
+
+    private void addHeaders(FlatBufferBuilder builder, HttpResponse response) {
+        if (!CollectionUtils.isEmpty(response.getHeaders())) {
+            int[] headerTexts = new int[response.getHeaders().size()];
+            for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
+                int i = -1;
+                int key = builder.createString(header.getKey());
+                int value = builder.createString(header.getValue());
+                int text = TextEntry.createTextEntry(builder, key, value);
+                headerTexts[++i] = text;
+            }
+            int headers = Rewrite.createHeadersVector(builder, headerTexts);
+            Rewrite.addHeaders(builder, headers);
+        }
+    }
+
+    @Override
+    public FlatBufferBuilder builder() {
+        return this.builder;
+    }
+
+    @Override
+    public FrameType type() {
+        return TYPE;
+    }
+
+    @Override
+    public A6Response filter(A6Request req) {
+        return null;
+    }
+}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
index 8b0e102..b082768 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
@@ -18,8 +18,6 @@
 package org.apache.apisix.plugin.runner.handler;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Mono;
 import reactor.netty.NettyInbound;
@@ -27,18 +25,14 @@ import reactor.netty.NettyOutbound;
 
 @RequiredArgsConstructor
 public class IOHandler {
-    
-    private final PluginRunnerEncoder encoder;
-    
-    private final PluginRunnerDecoder decoder;
-    
-    private final Dispatcher dispatcher;
+
+    private final PayloadHandler payloadHandler;
     
     public Publisher<Void> handle(NettyInbound in, NettyOutbound out) {
-        return in.receive()
-                .map(decoder::decode)
-                .map(dispatcher::dispatch)
-                .flatMap(e -> out.sendByteArray(Mono.just(encoder.encode(e).array())).then());
+        return in.receive().asByteBuffer()
+                .map(payloadHandler::decode)
+                .map(payloadHandler::dispatch)
+                .flatMap(e -> out.sendByteArray(Mono.just(payloadHandler.encode(e).array())).then());
     }
     
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
index 519b7ec..8321a2b 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
@@ -17,8 +17,6 @@
 
 package org.apache.apisix.plugin.runner.handler;
 
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -26,8 +24,8 @@ import org.springframework.context.annotation.Configuration;
 public class IOHandlerConfiguration {
     
     @Bean
-    public IOHandler createIOHandler(PluginRunnerEncoder encoder, PluginRunnerDecoder decoder, Dispatcher dispatcher) {
-        return new IOHandler(encoder, decoder, dispatcher);
+    public IOHandler createIOHandler(PayloadHandler payloadHandler) {
+        return new IOHandler(payloadHandler);
     }
     
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ServerHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadHandler.java
similarity index 78%
rename from runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ServerHandler.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadHandler.java
index 1818184..74340e0 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ServerHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadHandler.java
@@ -17,13 +17,14 @@
 
 package org.apache.apisix.plugin.runner.handler;
 
-import org.reactivestreams.Publisher;
-import reactor.netty.NettyInbound;
-import reactor.netty.NettyOutbound;
+import java.nio.ByteBuffer;
 
-@FunctionalInterface
-public interface ServerHandler {
-    
-    Publisher<Void> handler(NettyInbound in, NettyOutbound out);
-    
+public interface PayloadHandler {
+    RequestHandler decode(ByteBuffer buffer);
+
+    RequestHandler dispatch(RequestHandler handler);
+
+    ByteBuffer encode(RequestHandler handler);
+
+    RequestHandler error();
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
new file mode 100644
index 0000000..72c8cd1
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.flatbuffers.FlatBufferBuilder;
+import com.google.flatbuffers.Table;
+import io.github.api7.A6.PrepareConf.Req;
+import io.github.api7.A6.PrepareConf.Resp;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class PrepareConfHandler implements RequestHandler {
+    private static final FrameType TYPE = FrameType.RPC_PREPARE_CONF;
+    private final Table req;
+
+    private final Cache<Long, Req> cache;
+
+    public PrepareConfHandler(ByteBuffer body, Cache<Long, Req> cache) {
+        this.req = Req.getRootAsReq(body);
+        this.cache = cache;
+    }
+
+    private FlatBufferBuilder builder;
+
+    @Override
+    public FlatBufferBuilder handler(FlatBufferBuilder builder) {
+        this.builder = builder;
+        int token = confToken();
+        Resp.startResp(builder);
+        Resp.addConfToken(builder, token);
+        builder.finish(Resp.endResp(builder));
+        return builder;
+    }
+
+    private int confToken() {
+        Req req = (Req) this.req;
+        int token = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+        cache.put((long) token, req);
+        return token;
+    }
+
+    @Override
+    public FlatBufferBuilder builder() {
+        return this.builder;
+    }
+
+    @Override
+    public FrameType type() {
+        return TYPE;
+    }
+}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RequestHandler.java
similarity index 72%
rename from runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RequestHandler.java
index b79093a..3bc72a4 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RequestHandler.java
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner.handler;
 
-import org.apache.apisix.plugin.runner.A6Response;
+import com.google.flatbuffers.FlatBufferBuilder;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
 
-import java.nio.ByteBuffer;
+public interface RequestHandler {
 
-@FunctionalInterface
-public interface PluginRunnerEncoder {
-    
-    ByteBuffer encode(A6Response response);
+    FlatBufferBuilder handler(FlatBufferBuilder builder);
+
+    FlatBufferBuilder builder();
+
+    FrameType type();
 }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/IOHandlerCustomizer.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/IOHandlerCustomizer.java
deleted file mode 100644
index be3928c..0000000
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/IOHandlerCustomizer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.server.config;
-
-import org.apache.apisix.plugin.runner.handler.IOHandler;
-
-@FunctionalInterface
-public interface IOHandlerCustomizer {
-    
-    void customize(IOHandler handler);
-    
-}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
index cdb5f2b..ff3d166 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
@@ -19,19 +19,19 @@ package org.apache.apisix.plugin.runner.service;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import io.github.api7.A6.PrepareConf.Req;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Duration;
-import java.util.Properties;
 
 @Configuration
 public class CacheConfiguration {
     
     @Bean
-    public Cache<Long, Properties> configurationCache(@Value("${cache.config.expired:5000}") long expired,
-                                                        @Value("${cache.config.capacity:500}") int capacity) {
+    public Cache<Long, Req> configurationCache(@Value("${cache.config.expired:5000}") long expired,
+                                               @Value("${cache.config.capacity:500}") int capacity) {
         return CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMillis(expired)).maximumSize(capacity).build();
     }
 }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6HttpCallResponse.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
similarity index 62%
rename from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6HttpCallResponse.java
rename to runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
index 98fb2c6..583b76f 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6HttpCallResponse.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
@@ -17,25 +17,24 @@
 
 package org.apache.apisix.plugin.runner;
 
-import java.nio.ByteBuffer;
-import java.util.Map;
+import io.github.api7.A6.PrepareConf.Req;
+import io.github.api7.A6.TextEntry;
 
-public class A6HttpCallResponse implements A6Response {
-    
-    private final int requestId;
-    
-    private final Map<String, String> headers;
-    
-    private final Map<String, String> parameters;
-    
-    public A6HttpCallResponse(int requestId, Map<String, String> parameters, Map<String, String> headers) {
-        this.requestId = requestId;
-        this.headers = headers;
-        this.parameters = parameters;
+public class A6Config {
+
+    private final Req req;
+
+    public A6Config(Req req) {
+        this.req = req;
     }
-    
-    @Override
-    public ByteBuffer encode() {
+
+    public String get(String key) {
+        for (int i = 0; i < this.req.confLength(); i++) {
+            TextEntry conf = this.req.conf(i);
+            if (conf.name().equals(key)) {
+                return conf.value();
+            }
+        }
         return null;
     }
 }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
index 4a15a5e..dacd90c 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
@@ -17,17 +17,22 @@
 
 package org.apache.apisix.plugin.runner;
 
-import java.nio.ByteBuffer;
-
 public interface A6Response {
-    
-    ByteBuffer encode();
-    
-    enum Action {
-        
-        STOP,
-        REWRITE,
-        ERR
-        
+
+    enum ActionType {
+
+        NONE((byte) 0),
+        Stop((byte) 1),
+        Rewrite((byte) 2);
+
+        private final byte type;
+
+        ActionType(byte type) {
+            this.type = type;
+        }
+
+        public byte getType() {
+            return type;
+        }
     }
 }
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
index 8e5163e..472ad54 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
@@ -19,80 +19,72 @@ package org.apache.apisix.plugin.runner;
 
 import io.github.api7.A6.HTTPReqCall.Req;
 
-import java.nio.ByteBuffer;
 import java.util.Enumeration;
 import java.util.Map;
-import java.util.Properties;
 
 // @Readable
 // TODO extends from HttpServletRequest? Netty's HttpRequest?
-public class HttpRequest implements A6Request {
-    
+public class HttpRequest {
+
     private final Req req;
-    
+
     private int id;
-    
+
     private String sourceIP;
-    
+
     private Method method;
-    
+
     private String path;
-    
+
     private Map<String, String> parameter;
-    
+
     private Map<String, String> headers;
-    
-    private String confToken;
-    
-    private Properties config;
-    
+
+    private final A6Config config;
+
     private Map<String, Object> data;
-    
-    HttpRequest(Req req) {
+
+    public HttpRequest(Req req, A6Config config) {
         this.req = req;
+        this.config = config;
     }
-    
+
     public long getRequestId() {
         return req.id();
     }
-    
+
     public String getSourceIP() {
         return "";
     }
-    
+
     public Method getMethod() {
         return Method.values()[req.method()];
     }
-    
+
     public String getPath() {
         return req.path();
     }
-    
+
     public String getParameter(String name) {
         return parameter.get(name);
     }
-    
+
     public Map getParameterMap() {
         return null;
     }
-    
+
     public Enumeration getParameters() {
         return null;
     }
-    
+
     public String[] getParameterValues(String name) {
         return null;
     }
-    
-    @Override
-    public int getConfToken() {
-        return 0;
-    }
-    
-    public static HttpRequest from(ByteBuffer buffer) {
-        return new HttpRequest(Req.getRootAsReq(buffer));
+
+    public A6Config getConf() {
+        return this.config;
     }
-    
+
     public enum Method {
         GET,
         HEAD,
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
index 1ed4688..33d37ac 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
@@ -17,9 +17,9 @@
 
 package org.apache.apisix.plugin.runner;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
 import lombok.Data;
 
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -32,26 +32,40 @@ import java.util.Objects;
  */
 @Data
 public class HttpResponse implements A6Response {
-    
+
     private final int requestId;
-    
-    private Action action;
-    
+
+    private ActionType actionType;
+
     private Map<String, String> headers;
-    
+
+    private Map<String, String> args;
+
+    private String path;
+
+    private Map<String, String> body;
+
+    private HttpResponseStatus status;
+
     public HttpResponse(int requestId) {
         this.requestId = requestId;
     }
-    
+
     public void addHeader(String headerKey, String headerValue) {
         if (Objects.isNull(headers)) {
             headers = new HashMap<>();
         }
         headers.put(headerKey, headerValue);
     }
-    
-    @Override
-    public ByteBuffer encode() {
-        return null;
+
+    public void addArg(String argKey, String argValue) {
+        if (Objects.isNull(args)) {
+            args = new HashMap<>();
+        }
+        args.put(argKey, argValue);
+    }
+
+    public void setPath(String path) {
+        this.path = path;
     }
 }
diff --git a/runner-starter/src/main/resources/application.yaml b/runner-starter/src/main/resources/application.yaml
index 2dec244..2150f4c 100644
--- a/runner-starter/src/main/resources/application.yaml
+++ b/runner-starter/src/main/resources/application.yaml
@@ -18,5 +18,5 @@ logging:
     root: debug
 
 cache.config:
-  expired: 200
+  expired: 3600000
   capacity: 12000
\ No newline at end of file