You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/05/19 09:10:04 UTC
[apisix-java-plugin-runner] branch main updated: feat: adjust
protocol handling and refine logic (#3)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git
The following commit(s) were added to refs/heads/main by this push:
new 62c2e2c feat: adjust protocol handling and refine logic (#3)
62c2e2c is described below
commit 62c2e2c55460436589e52ebf6072639ab39085e3
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Wed May 19 17:09:58 2021 +0800
feat: adjust protocol handling and refine logic (#3)
---
pom.xml | 5 +
.../runner/codec/PluginRunnerConfiguration.java | 18 +-
.../plugin/runner/codec/frame/FrameCodec.java | 68 ++++++++
.../plugin/runner/codec/frame/FrameType.java | 27 +--
.../runner/codec/impl/FlatBuffersDecoder.java | 45 -----
.../runner/codec/impl/FlatBuffersEncoder.java | 31 ----
.../runner/handler/DefaultPayloadHandler.java | 99 +++++++++++
.../apisix/plugin/runner/handler/ErrHandler.java | 46 +++---
.../Filter.java} | 11 +-
.../plugin/runner/handler/HTTPReqCallHandler.java | 182 +++++++++++++++++++++
.../apisix/plugin/runner/handler/IOHandler.java | 18 +-
.../runner/handler/IOHandlerConfiguration.java | 6 +-
.../{ServerHandler.java => PayloadHandler.java} | 17 +-
.../plugin/runner/handler/PrepareConfHandler.java | 69 ++++++++
.../RequestHandler.java} | 16 +-
.../runner/server/config/IOHandlerCustomizer.java | 27 ---
.../plugin/runner/service/CacheConfiguration.java | 6 +-
.../{A6HttpCallResponse.java => A6Config.java} | 33 ++--
.../apache/apisix/plugin/runner/A6Response.java | 27 +--
.../apache/apisix/plugin/runner/HttpRequest.java | 60 +++----
.../apache/apisix/plugin/runner/HttpResponse.java | 36 ++--
runner-starter/src/main/resources/application.yaml | 2 +-
22 files changed, 588 insertions(+), 261 deletions(-)
diff --git a/pom.xml b/pom.xml
index c7beacb..0405e72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,11 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <classifier>linux-aarch_64</classifier>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
</dependency>
<dependency>
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
index ac314c1..03f26b4 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerConfiguration.java
@@ -17,22 +17,18 @@
package org.apache.apisix.plugin.runner.codec;
-import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersDecoder;
-import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersEncoder;
+import com.google.common.cache.Cache;
+import io.github.api7.A6.PrepareConf.Req;
+import org.apache.apisix.plugin.runner.handler.DefaultPayloadHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PluginRunnerConfiguration {
-
- @Bean
- public PluginRunnerDecoder createDecoder() {
- return new FlatBuffersDecoder();
- }
-
+
@Bean
- public PluginRunnerEncoder createEncoder() {
- return new FlatBuffersEncoder();
+ public DefaultPayloadHandler createPayloadHandler(Cache<Long, Req> cache) {
+ return new DefaultPayloadHandler(cache);
}
-
+
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameCodec.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameCodec.java
new file mode 100644
index 0000000..b50e286
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameCodec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.codec.frame;
+
+import java.nio.ByteBuffer;
+
+public class FrameCodec {
+
+ public static int getDataLength(ByteBuffer payload) {
+ byte[] bytes = new byte[3];
+ for (int i = 0; i < 3; i++) {
+ bytes[i] = payload.get();
+ }
+ return byte3ToInt(bytes);
+ }
+
+ public static ByteBuffer getBody(ByteBuffer payload) {
+ int length = getDataLength(payload);
+ ByteBuffer buffer = payload.slice();
+ byte[] dst = new byte[length];
+ buffer.get(dst, 0, length);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ByteBuffer setBody(ByteBuffer payload, FrameType frameType) {
+ byte[] data = new byte[payload.remaining()];
+ payload.get(data);
+ ByteBuffer buffer = ByteBuffer.allocate(data.length + 4);
+ buffer.put(frameType.getType());
+ // data length
+ byte[] length = intToByte3(data.length);
+ buffer.put(length);
+ // data
+ buffer.put(data);
+ buffer.flip();
+ return buffer;
+ }
+
+ private static byte[] intToByte3(int i) {
+ byte[] targets = new byte[3];
+ targets[2] = (byte) (i & 0xFF);
+ targets[1] = (byte) (i >> 8 & 0xFF);
+ targets[0] = (byte) ((i >> 16 & 0xFF));
+ return targets;
+ }
+
+ private static int byte3ToInt(byte[] bytes) {
+ return bytes[2] & 0xFF |
+ (bytes[1] & 0xFF << 8) |
+ (bytes[0] & 0xFF << 16);
+ }
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameType.java
similarity index 72%
copy from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
copy to runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameType.java
index 4a15a5e..118155b 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/frame/FrameType.java
@@ -15,19 +15,22 @@
* limitations under the License.
*/
-package org.apache.apisix.plugin.runner;
+package org.apache.apisix.plugin.runner.codec.frame;
-import java.nio.ByteBuffer;
+public enum FrameType {
+ RPC_ERROR((byte) 0),
-public interface A6Response {
-
- ByteBuffer encode();
-
- enum Action {
-
- STOP,
- REWRITE,
- ERR
-
+ RPC_PREPARE_CONF((byte) 1),
+
+ RPC_HTTP_REQ_CALL((byte) 2);
+
+ private final byte type;
+
+ FrameType(byte type) {
+ this.type = type;
+ }
+
+ public byte getType() {
+ return type;
}
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
deleted file mode 100644
index a551f50..0000000
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersDecoder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.codec.impl;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.apisix.plugin.runner.A6ConfigRequest;
-import org.apache.apisix.plugin.runner.A6Request;
-import org.apache.apisix.plugin.runner.HttpRequest;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-
-import java.nio.ByteBuffer;
-
-public class FlatBuffersDecoder implements PluginRunnerDecoder {
-
- @Override
- public A6Request decode(ByteBuf buf) {
- final byte type = buf.readByte();
- final short length = buf.readShort();
-
- ByteBuffer buffer = ByteBuffer.allocate(length);
- buf.getBytes(length, buffer);
- buffer.flip();
-
- if (type == 0) {
- return A6ConfigRequest.from(buffer);
- } else {
- return HttpRequest.from(buffer);
- }
- }
-}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
deleted file mode 100644
index 4e04ebf..0000000
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/impl/FlatBuffersEncoder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.codec.impl;
-
-import org.apache.apisix.plugin.runner.A6Response;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
-
-import java.nio.ByteBuffer;
-
-public class FlatBuffersEncoder implements PluginRunnerEncoder {
-
- @Override
- public ByteBuffer encode(A6Response response) {
- return response.encode();
- }
-}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/DefaultPayloadHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/DefaultPayloadHandler.java
new file mode 100644
index 0000000..90b8eed
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/DefaultPayloadHandler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.flatbuffers.FlatBufferBuilder;
+import io.github.api7.A6.Err.Code;
+import io.github.api7.A6.PrepareConf.Req;
+import lombok.RequiredArgsConstructor;
+import org.apache.apisix.plugin.runner.codec.frame.FrameCodec;
+import org.apache.apisix.plugin.runner.filter.FilterBean;
+import org.apache.apisix.plugin.runner.filter.FilterChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.ObjectProvider;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@RequiredArgsConstructor
+public class DefaultPayloadHandler implements PayloadHandler {
+
+ private final Logger logger = LoggerFactory.getLogger(DefaultPayloadHandler.class);
+
+ FlatBufferBuilder builder;
+
+ private RequestHandler handler;
+
+ private final Cache<Long, Req> cache;
+
+ public RequestHandler decode(ByteBuffer buffer) {
+ byte type = buffer.get();
+ ByteBuffer body = FrameCodec.getBody(buffer);
+
+ switch (type) {
+ case 1:
+ handler = new PrepareConfHandler(body, cache);
+ return handler;
+ case 2:
+// FilterChain chain = createFilterChain(null);
+ handler = new HTTPReqCallHandler(body, cache, null);
+ return handler;
+ default:
+ break;
+ }
+
+ logger.error("receiving unsupport type: {}", type);
+ return error(Code.BAD_REQUEST);
+ }
+
+ private FilterChain createFilterChain(ObjectProvider<FilterBean> beanProvider) {
+ List<FilterBean> filterList = beanProvider.orderedStream().collect(Collectors.toList());
+ FilterChain chain = null;
+ if (!filterList.isEmpty()) {
+ for (int i = filterList.size() - 1; i >= 0; i--) {
+ chain = new FilterChain(filterList.get(i), chain);
+ }
+ }
+ return chain;
+ }
+
+ public ByteBuffer encode(RequestHandler handler) {
+ ByteBuffer buffer = this.handler.builder().dataBuffer();
+ return FrameCodec.setBody(buffer, this.handler.type());
+ }
+
+ @Override
+ public RequestHandler error() {
+ logger.error("process rpc call error");
+ return error(Code.SERVICE_UNAVAILABLE);
+ }
+
+ public RequestHandler dispatch(RequestHandler handler) {
+ builder = new FlatBufferBuilder();
+ builder = this.handler.handler(builder);
+ return handler;
+ }
+
+ private RequestHandler error(int code) {
+ return new ErrHandler(code);
+ }
+
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigRequest.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ErrHandler.java
similarity index 50%
rename from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigRequest.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ErrHandler.java
index 5d1b669..cd8a76a 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigRequest.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ErrHandler.java
@@ -15,33 +15,39 @@
* limitations under the License.
*/
-package org.apache.apisix.plugin.runner;
+package org.apache.apisix.plugin.runner.handler;
-import io.github.api7.A6.PrepareConf.Req;
+import com.google.flatbuffers.FlatBufferBuilder;
+import io.github.api7.A6.Err.Resp;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
-import java.nio.ByteBuffer;
+public class ErrHandler implements RequestHandler {
+ private static final FrameType TYPE = FrameType.RPC_ERROR;
-public class A6ConfigRequest implements A6Request {
-
- private final int confToken;
-
- public A6ConfigRequest(int confToken) {
- this.confToken = confToken;
+ private int code = -1;
+ private FlatBufferBuilder builder;
+
+ public ErrHandler(int code) {
+ this.code = code;
}
-
+
@Override
- public boolean isConfigRequest() {
- return true;
+ public FlatBufferBuilder handler(FlatBufferBuilder builder) {
+ this.builder = builder;
+ Resp.startResp(builder);
+ Resp.addCode(builder, code == -1 ? 1 : code);
+ int orc = Resp.endResp(builder);
+ builder.finish(orc);
+ return builder;
}
-
+
@Override
- public int getConfToken() {
- return confToken;
+ public FlatBufferBuilder builder() {
+ return this.builder;
}
-
- public static A6ConfigRequest from(ByteBuffer buffer) {
- // TODO request id and confToken came from client?
- Req req = Req.getRootAsReq(buffer);
- return new A6ConfigRequest(0);
+
+ @Override
+ public FrameType type() {
+ return TYPE;
}
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Filter.java
similarity index 82%
rename from runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Filter.java
index fe4221b..6f79e81 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerDecoder.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/Filter.java
@@ -15,14 +15,11 @@
* limitations under the License.
*/
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner.handler;
-import io.netty.buffer.ByteBuf;
import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.A6Response;
-@FunctionalInterface
-public interface PluginRunnerDecoder {
-
- A6Request decode(ByteBuf buf);
-
+public interface Filter {
+ A6Response filter(A6Request req);
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
new file mode 100644
index 0000000..f4e3fcc
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/HTTPReqCallHandler.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.flatbuffers.FlatBufferBuilder;
+import com.google.flatbuffers.Table;
+import io.github.api7.A6.DataEntry;
+import io.github.api7.A6.Err.Code;
+import io.github.api7.A6.HTTPReqCall.Req;
+import io.github.api7.A6.HTTPReqCall.Resp;
+import io.github.api7.A6.HTTPReqCall.Rewrite;
+import io.github.api7.A6.HTTPReqCall.Stop;
+import io.github.api7.A6.TextEntry;
+import org.apache.apisix.plugin.runner.A6Config;
+import org.apache.apisix.plugin.runner.A6Request;
+import org.apache.apisix.plugin.runner.A6Response;
+import org.apache.apisix.plugin.runner.HttpRequest;
+import org.apache.apisix.plugin.runner.HttpResponse;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
+import org.apache.apisix.plugin.runner.filter.FilterChain;
+import org.springframework.util.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class HTTPReqCallHandler implements RequestHandler, Filter {
+
+ private static final FrameType TYPE = FrameType.RPC_HTTP_REQ_CALL;
+ private final Table req;
+ private FlatBufferBuilder builder;
+ private final Cache<Long, io.github.api7.A6.PrepareConf.Req> cache;
+ private final FilterChain chain;
+
+ public HTTPReqCallHandler(ByteBuffer body,
+ Cache<Long, io.github.api7.A6.PrepareConf.Req> cache,
+ FilterChain chain) {
+ this.req = Req.getRootAsReq(body);
+ this.cache = cache;
+ this.chain = chain;
+ }
+
+ @Override
+ public FlatBufferBuilder handler(FlatBufferBuilder builder) {
+ this.builder = builder;
+ Req req = (Req) this.req;
+
+ io.github.api7.A6.PrepareConf.Req conf = cache.getIfPresent(req.confToken());
+ if (null == conf) {
+ io.github.api7.A6.Err.Resp.startResp(builder);
+ io.github.api7.A6.Err.Resp.addCode(builder, Code.CONF_TOKEN_NOT_FOUND);
+ int orc = io.github.api7.A6.Err.Resp.endResp(builder);
+ builder.finish(orc);
+ return builder;
+ }
+
+ A6Config config = new A6Config(conf);
+ HttpRequest request = new HttpRequest(req, config);
+ HttpResponse response = new HttpResponse((int) req.id());
+// chain.doFilter(request, response);
+ return buildRes(builder, response);
+ }
+
+ private FlatBufferBuilder buildRes(FlatBufferBuilder builder, HttpResponse response) {
+
+ if (null == response.getActionType()) {
+ response.setActionType(A6Response.ActionType.NONE);
+ }
+
+ int action = 0;
+ if (response.getActionType() == A6Response.ActionType.Rewrite) {
+ action = buildRewriteResp(builder, response);
+ }
+
+ if (response.getActionType() == A6Response.ActionType.Stop) {
+ action = buildStopResp(builder, response);
+ }
+
+ Resp.startResp(builder);
+ Resp.addAction(builder, action);
+ if (null != response.getActionType()) {
+ Resp.addActionType(builder, response.getActionType().getType());
+ }
+ Resp.addId(builder, response.getRequestId());
+ builder.finish(Resp.endResp(builder));
+ return builder;
+ }
+
+ private int buildStopResp(FlatBufferBuilder builder, HttpResponse response) {
+ Stop.startStop(builder);
+ addHeaders(builder, response);
+ addBody(builder, response);
+ Stop.addStatus(builder, response.getStatus().code());
+ return Stop.endStop(builder);
+ }
+
+ private void addBody(FlatBufferBuilder builder, HttpResponse response) {
+ if (!CollectionUtils.isEmpty(response.getBody())) {
+ byte[] bodyTexts = new byte[response.getBody().size()];
+ for (Map.Entry<String, String> arg : response.getBody().entrySet()) {
+ int i = -1;
+ int key = builder.createString(arg.getKey());
+ int value = builder.createString(arg.getValue());
+ int text = DataEntry.createDataEntry(builder, key, value);
+ bodyTexts[++i] = (byte) text;
+ }
+ int body = Stop.createBodyVector(builder, bodyTexts);
+ Stop.addBody(builder, body);
+ }
+ }
+
+ private int buildRewriteResp(FlatBufferBuilder builder, HttpResponse response) {
+ Rewrite.startRewrite(builder);
+ if (null != response.getPath()) {
+ int path = builder.createString(response.getPath());
+ Rewrite.addPath(builder, path);
+ }
+ addHeaders(builder, response);
+ addArgs(builder, response);
+ return Rewrite.endRewrite(builder);
+ }
+
+ private void addArgs(FlatBufferBuilder builder, HttpResponse response) {
+ if (!CollectionUtils.isEmpty(response.getArgs())) {
+ int[] argTexts = new int[response.getArgs().size()];
+ for (Map.Entry<String, String> arg : response.getArgs().entrySet()) {
+ int i = -1;
+ int key = builder.createString(arg.getKey());
+ int value = builder.createString(arg.getValue());
+ int text = TextEntry.createTextEntry(builder, key, value);
+ argTexts[++i] = text;
+ }
+ int args = Rewrite.createArgsVector(builder, argTexts);
+ Rewrite.addArgs(builder, args);
+ }
+ }
+
+ private void addHeaders(FlatBufferBuilder builder, HttpResponse response) {
+ if (!CollectionUtils.isEmpty(response.getHeaders())) {
+ int[] headerTexts = new int[response.getHeaders().size()];
+ for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
+ int i = -1;
+ int key = builder.createString(header.getKey());
+ int value = builder.createString(header.getValue());
+ int text = TextEntry.createTextEntry(builder, key, value);
+ headerTexts[++i] = text;
+ }
+ int headers = Rewrite.createHeadersVector(builder, headerTexts);
+ Rewrite.addHeaders(builder, headers);
+ }
+ }
+
+ @Override
+ public FlatBufferBuilder builder() {
+ return this.builder;
+ }
+
+ @Override
+ public FrameType type() {
+ return TYPE;
+ }
+
+ @Override
+ public A6Response filter(A6Request req) {
+ return null;
+ }
+}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
index 8b0e102..b082768 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandler.java
@@ -18,8 +18,6 @@
package org.apache.apisix.plugin.runner.handler;
import lombok.RequiredArgsConstructor;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
@@ -27,18 +25,14 @@ import reactor.netty.NettyOutbound;
@RequiredArgsConstructor
public class IOHandler {
-
- private final PluginRunnerEncoder encoder;
-
- private final PluginRunnerDecoder decoder;
-
- private final Dispatcher dispatcher;
+
+ private final PayloadHandler payloadHandler;
public Publisher<Void> handle(NettyInbound in, NettyOutbound out) {
- return in.receive()
- .map(decoder::decode)
- .map(dispatcher::dispatch)
- .flatMap(e -> out.sendByteArray(Mono.just(encoder.encode(e).array())).then());
+ return in.receive().asByteBuffer()
+ .map(payloadHandler::decode)
+ .map(payloadHandler::dispatch)
+ .flatMap(e -> out.sendByteArray(Mono.just(payloadHandler.encode(e).array())).then());
}
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
index 519b7ec..8321a2b 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/IOHandlerConfiguration.java
@@ -17,8 +17,6 @@
package org.apache.apisix.plugin.runner.handler;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
-import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -26,8 +24,8 @@ import org.springframework.context.annotation.Configuration;
public class IOHandlerConfiguration {
@Bean
- public IOHandler createIOHandler(PluginRunnerEncoder encoder, PluginRunnerDecoder decoder, Dispatcher dispatcher) {
- return new IOHandler(encoder, decoder, dispatcher);
+ public IOHandler createIOHandler(PayloadHandler payloadHandler) {
+ return new IOHandler(payloadHandler);
}
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ServerHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadHandler.java
similarity index 78%
rename from runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ServerHandler.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadHandler.java
index 1818184..74340e0 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ServerHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PayloadHandler.java
@@ -17,13 +17,14 @@
package org.apache.apisix.plugin.runner.handler;
-import org.reactivestreams.Publisher;
-import reactor.netty.NettyInbound;
-import reactor.netty.NettyOutbound;
+import java.nio.ByteBuffer;
-@FunctionalInterface
-public interface ServerHandler {
-
- Publisher<Void> handler(NettyInbound in, NettyOutbound out);
-
+public interface PayloadHandler {
+ RequestHandler decode(ByteBuffer buffer);
+
+ RequestHandler dispatch(RequestHandler handler);
+
+ ByteBuffer encode(RequestHandler handler);
+
+ RequestHandler error();
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
new file mode 100644
index 0000000..72c8cd1
--- /dev/null
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import com.google.common.cache.Cache;
+import com.google.flatbuffers.FlatBufferBuilder;
+import com.google.flatbuffers.Table;
+import io.github.api7.A6.PrepareConf.Req;
+import io.github.api7.A6.PrepareConf.Resp;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class PrepareConfHandler implements RequestHandler {
+ private static final FrameType TYPE = FrameType.RPC_PREPARE_CONF;
+ private final Table req;
+
+ private final Cache<Long, Req> cache;
+
+ public PrepareConfHandler(ByteBuffer body, Cache<Long, Req> cache) {
+ this.req = Req.getRootAsReq(body);
+ this.cache = cache;
+ }
+
+ private FlatBufferBuilder builder;
+
+ @Override
+ public FlatBufferBuilder handler(FlatBufferBuilder builder) {
+ this.builder = builder;
+ int token = confToken();
+ Resp.startResp(builder);
+ Resp.addConfToken(builder, token);
+ builder.finish(Resp.endResp(builder));
+ return builder;
+ }
+
+ private int confToken() {
+ Req req = (Req) this.req;
+ int token = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+ cache.put((long) token, req);
+ return token;
+ }
+
+ @Override
+ public FlatBufferBuilder builder() {
+ return this.builder;
+ }
+
+ @Override
+ public FrameType type() {
+ return TYPE;
+ }
+}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RequestHandler.java
similarity index 72%
rename from runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
rename to runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RequestHandler.java
index b79093a..3bc72a4 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/codec/PluginRunnerEncoder.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RequestHandler.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.apisix.plugin.runner.codec;
+package org.apache.apisix.plugin.runner.handler;
-import org.apache.apisix.plugin.runner.A6Response;
+import com.google.flatbuffers.FlatBufferBuilder;
+import org.apache.apisix.plugin.runner.codec.frame.FrameType;
-import java.nio.ByteBuffer;
+public interface RequestHandler {
-@FunctionalInterface
-public interface PluginRunnerEncoder {
-
- ByteBuffer encode(A6Response response);
+ FlatBufferBuilder handler(FlatBufferBuilder builder);
+
+ FlatBufferBuilder builder();
+
+ FrameType type();
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/IOHandlerCustomizer.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/IOHandlerCustomizer.java
deleted file mode 100644
index be3928c..0000000
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/config/IOHandlerCustomizer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.apisix.plugin.runner.server.config;
-
-import org.apache.apisix.plugin.runner.handler.IOHandler;
-
-@FunctionalInterface
-public interface IOHandlerCustomizer {
-
- void customize(IOHandler handler);
-
-}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
index cdb5f2b..ff3d166 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/service/CacheConfiguration.java
@@ -19,19 +19,19 @@ package org.apache.apisix.plugin.runner.service;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import io.github.api7.A6.PrepareConf.Req;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
-import java.util.Properties;
@Configuration
public class CacheConfiguration {
@Bean
- public Cache<Long, Properties> configurationCache(@Value("${cache.config.expired:5000}") long expired,
- @Value("${cache.config.capacity:500}") int capacity) {
+ public Cache<Long, Req> configurationCache(@Value("${cache.config.expired:5000}") long expired,
+ @Value("${cache.config.capacity:500}") int capacity) {
return CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMillis(expired)).maximumSize(capacity).build();
}
}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6HttpCallResponse.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
similarity index 62%
rename from runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6HttpCallResponse.java
rename to runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
index 98fb2c6..583b76f 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6HttpCallResponse.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Config.java
@@ -17,25 +17,24 @@
package org.apache.apisix.plugin.runner;
-import java.nio.ByteBuffer;
-import java.util.Map;
+import io.github.api7.A6.PrepareConf.Req;
+import io.github.api7.A6.TextEntry;
-public class A6HttpCallResponse implements A6Response {
-
- private final int requestId;
-
- private final Map<String, String> headers;
-
- private final Map<String, String> parameters;
-
- public A6HttpCallResponse(int requestId, Map<String, String> parameters, Map<String, String> headers) {
- this.requestId = requestId;
- this.headers = headers;
- this.parameters = parameters;
+public class A6Config {
+
+ private final Req req;
+
+ public A6Config(Req req) {
+ this.req = req;
}
-
- @Override
- public ByteBuffer encode() {
+
+ public String get(String key) {
+ for (int i = 0; i < this.req.confLength(); i++) {
+ TextEntry conf = this.req.conf(i);
+ if (conf.name().equals(key)) {
+ return conf.value();
+ }
+ }
return null;
}
}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
index 4a15a5e..dacd90c 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6Response.java
@@ -17,17 +17,22 @@
package org.apache.apisix.plugin.runner;
-import java.nio.ByteBuffer;
-
public interface A6Response {
-
- ByteBuffer encode();
-
- enum Action {
-
- STOP,
- REWRITE,
- ERR
-
+
+ enum ActionType {
+
+ NONE((byte) 0),
+ Stop((byte) 1),
+ Rewrite((byte) 2);
+
+ private final byte type;
+
+ ActionType(byte type) {
+ this.type = type;
+ }
+
+ public byte getType() {
+ return type;
+ }
}
}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
index 8e5163e..472ad54 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpRequest.java
@@ -19,80 +19,72 @@ package org.apache.apisix.plugin.runner;
import io.github.api7.A6.HTTPReqCall.Req;
-import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.Map;
-import java.util.Properties;
// @Readable
// TODO extends from HttpServletRequest? Netty's HttpRequest?
-public class HttpRequest implements A6Request {
-
+public class HttpRequest {
+
private final Req req;
-
+
private int id;
-
+
private String sourceIP;
-
+
private Method method;
-
+
private String path;
-
+
private Map<String, String> parameter;
-
+
private Map<String, String> headers;
-
- private String confToken;
-
- private Properties config;
-
+
+ private final A6Config config;
+
private Map<String, Object> data;
-
- HttpRequest(Req req) {
+
+ public HttpRequest(Req req, A6Config config) {
this.req = req;
+ this.config = config;
}
-
+
public long getRequestId() {
return req.id();
}
-
+
public String getSourceIP() {
return "";
}
-
+
public Method getMethod() {
return Method.values()[req.method()];
}
-
+
public String getPath() {
return req.path();
}
-
+
public String getParameter(String name) {
return parameter.get(name);
}
-
+
public Map getParameterMap() {
return null;
}
-
+
public Enumeration getParameters() {
return null;
}
-
+
public String[] getParameterValues(String name) {
return null;
}
-
- @Override
- public int getConfToken() {
- return 0;
- }
-
- public static HttpRequest from(ByteBuffer buffer) {
- return new HttpRequest(Req.getRootAsReq(buffer));
+
+ public A6Config getConf() {
+ return this.config;
}
-
+
public enum Method {
GET,
HEAD,
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
index 1ed4688..33d37ac 100644
--- a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/HttpResponse.java
@@ -17,9 +17,9 @@
package org.apache.apisix.plugin.runner;
+import io.netty.handler.codec.http.HttpResponseStatus;
import lombok.Data;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -32,26 +32,40 @@ import java.util.Objects;
*/
@Data
public class HttpResponse implements A6Response {
-
+
private final int requestId;
-
- private Action action;
-
+
+ private ActionType actionType;
+
private Map<String, String> headers;
-
+
+ private Map<String, String> args;
+
+ private String path;
+
+ private Map<String, String> body;
+
+ private HttpResponseStatus status;
+
public HttpResponse(int requestId) {
this.requestId = requestId;
}
-
+
public void addHeader(String headerKey, String headerValue) {
if (Objects.isNull(headers)) {
headers = new HashMap<>();
}
headers.put(headerKey, headerValue);
}
-
- @Override
- public ByteBuffer encode() {
- return null;
+
+ public void addArg(String argKey, String argValue) {
+ if (Objects.isNull(args)) {
+ args = new HashMap<>();
+ }
+ args.put(argKey, argValue);
+ }
+
+ public void setPath(String path) {
+ this.path = path;
}
}
diff --git a/runner-starter/src/main/resources/application.yaml b/runner-starter/src/main/resources/application.yaml
index 2dec244..2150f4c 100644
--- a/runner-starter/src/main/resources/application.yaml
+++ b/runner-starter/src/main/resources/application.yaml
@@ -18,5 +18,5 @@ logging:
root: debug
cache.config:
- expired: 200
+ expired: 3600000
capacity: 12000
\ No newline at end of file