You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by ch...@apache.org on 2022/10/31 09:53:43 UTC
[apisix-java-plugin-runner] 01/01: feat: support watching config for filters
This is an automated email from the ASF dual-hosted git repository.
chenjunxu pushed a commit to branch feat/watch
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git
commit f5ea54f4964caed618753a30db0936c0aae02819
Author: nic-chen <ch...@gmail.com>
AuthorDate: Mon Oct 31 17:53:34 2022 +0800
feat: support watching config for filters
---
.../plugin/runner/handler/PrepareConfHandler.java | 5 ++++
.../plugin/runner/server/ApplicationRunner.java | 20 ++++++++++-----
.../plugin/runner/handler/A6ConfigHandlerTest.java | 29 ++++++++++++++--------
.../runner/handler/A6HttpCallHandlerTest.java | 9 +++++--
.../plugin/runner/handler/ExtraInfoTest.java | 9 +++++--
.../plugin/runner/handler/PostFilterTest.java | 10 ++++++--
.../apisix/plugin/runner/handler/TestWatcher.java | 24 ++++++++++++++++++
.../apisix/plugin/runner/A6ConfigWatcher.java | 17 +++++++++++++
8 files changed, 101 insertions(+), 22 deletions(-)
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
index 799208a..6a7e9fa 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -26,6 +26,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.A6Response;
import org.apache.apisix.plugin.runner.constants.Constants;
@@ -48,6 +49,7 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
private final Cache<Long, A6Conf> cache;
private final Map<String, PluginFilter> filters;
+ private final List<A6ConfigWatcher> watchers;
@Override
protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
@@ -75,6 +77,9 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
}
A6Conf a6Conf = new A6Conf(config, chain);
cache.put(token, a6Conf);
+ for (A6ConfigWatcher watcher : watchers) {
+ watcher.watch(token, a6Conf);
+ }
ctx.write(response);
ctx.writeAndFlush(response);
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
index 8c6ae48..865983a 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
@@ -47,7 +47,9 @@ import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
+
import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.filter.PluginFilter;
import org.apache.apisix.plugin.runner.handler.PrepareConfHandler;
import org.apache.apisix.plugin.runner.handler.RpcCallHandler;
@@ -67,21 +69,27 @@ public class ApplicationRunner implements CommandLineRunner {
private Cache<Long, A6Conf> cache;
- private ObjectProvider<PluginFilter> beanProvider;
+ private ObjectProvider<PluginFilter> filterProvider;
+ private ObjectProvider<A6ConfigWatcher> watcherProvider;
@Autowired
- public ApplicationRunner(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+ public ApplicationRunner(Cache<Long, A6Conf> cache,
+ ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
this.cache = cache;
- this.beanProvider = beanProvider;
+ this.filterProvider = filterProvider;
+ this.watcherProvider = watcherProvider;
}
- public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+ public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache,
+ ObjectProvider<PluginFilter> beanProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
List<PluginFilter> pluginFilterList = beanProvider.orderedStream().collect(Collectors.toList());
Map<String, PluginFilter> filterMap = new HashMap<>();
for (PluginFilter filter : pluginFilterList) {
filterMap.put(filter.name(), filter);
}
- return new PrepareConfHandler(cache, filterMap);
+ List<A6ConfigWatcher> configWatcherList = watcherProvider.orderedStream().collect(Collectors.toList());
+
+ return new PrepareConfHandler(cache, filterMap, configWatcherList);
}
public RpcCallHandler createA6HttpHandler(Cache<Long, A6Conf> cache) {
@@ -123,7 +131,7 @@ public class ApplicationRunner implements CommandLineRunner {
.addAfter("logger", "payloadEncoder", new PayloadEncoder())
.addAfter("payloadEncoder", "delayedDecoder", new BinaryProtocolDecoder())
.addLast("payloadDecoder", new PayloadDecoder())
- .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, beanProvider))
+ .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, filterProvider, watcherProvider))
.addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache))
.addLast("exceptionCaughtHandler", new ExceptionCaughtHandler());
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
index 0f33d68..ced6711 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
@@ -17,6 +17,16 @@
package org.apache.apisix.plugin.runner.handler;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.flatbuffers.FlatBufferBuilder;
@@ -27,19 +37,11 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.HttpResponse;
import org.apache.apisix.plugin.runner.filter.PluginFilter;
import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
@DisplayName("test add filter")
class A6ConfigHandlerTest {
@@ -48,8 +50,12 @@ class A6ConfigHandlerTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
PrepareConfHandler prepareConfHandler;
+ TestWatcher tWatcher = new TestWatcher() ;
+
@BeforeEach
void setUp() {
filters = new HashMap<>();
@@ -96,8 +102,10 @@ class A6ConfigHandlerTest {
return null;
}
});
+ watchers = new ArrayList<>();
+ watchers.add(tWatcher);
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
}
@Test
@@ -124,6 +132,7 @@ class A6ConfigHandlerTest {
Assertions.assertEquals(config.getChain().getFilters().size(), 1);
Assertions.assertEquals(config.getChain().getIndex(), 0);
Assertions.assertEquals(config.get("FooFilter"), "Bar");
+ Assertions.assertEquals(tWatcher.getConfig(), config.getConfig());
}
@Test
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
index b6a81e2..d7360cb 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
@@ -28,6 +28,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.A6ErrResponse;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.HttpResponse;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,6 +60,8 @@ class A6HttpCallHandlerTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
EmbeddedChannel channel;
PrepareConfHandler prepareConfHandler;
@@ -149,6 +153,7 @@ class A6HttpCallHandlerTest {
return null;
}
});
+ watchers = new ArrayList<>();
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -167,14 +172,14 @@ class A6HttpCallHandlerTest {
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
A6ConfigRequest request = new A6ConfigRequest(req);
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
channel.writeInbound(request);
channel.finish();
A6ConfigResponse response = channel.readOutbound();
confToken = response.getConfToken();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
rpcCallHandler = new RpcCallHandler(cache);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
}
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
index d921858..fafc4e4 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
@@ -26,6 +26,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.ExtraInfoRequest;
import org.apache.apisix.plugin.runner.ExtraInfoResponse;
import org.apache.apisix.plugin.runner.HttpRequest;
@@ -41,6 +42,7 @@ import org.springframework.test.util.ReflectionTestUtils;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,6 +63,8 @@ class ExtraInfoTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
EmbeddedChannel channel;
PrepareConfHandler prepareConfHandler;
@@ -127,6 +131,7 @@ class ExtraInfoTest {
return true;
}
});
+ watchers = new ArrayList<>();
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -145,14 +150,14 @@ class ExtraInfoTest {
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
A6ConfigRequest request = new A6ConfigRequest(req);
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
channel.writeInbound(request);
channel.finish();
A6ConfigResponse response = channel.readOutbound();
confToken = response.getConfToken();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
rpcCallHandler = new RpcCallHandler(cache);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
}
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
index e2de6dc..98b7e9e 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
@@ -25,6 +25,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.PostRequest;
import org.apache.apisix.plugin.runner.PostResponse;
import org.apache.apisix.plugin.runner.filter.PluginFilter;
@@ -37,7 +38,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -52,6 +55,8 @@ public class PostFilterTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
EmbeddedChannel channel;
PrepareConfHandler prepareConfHandler;
@@ -86,6 +91,7 @@ public class PostFilterTest {
}
}
);
+ watchers = new ArrayList<>();
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -100,14 +106,14 @@ public class PostFilterTest {
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
A6ConfigRequest request = new A6ConfigRequest(req);
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
channel.writeInbound(request);
channel.finish();
A6ConfigResponse response = channel.readOutbound();
confToken = response.getConfToken();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
rpcCallHandler = new RpcCallHandler(cache);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
}
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
new file mode 100644
index 0000000..e01d41c
--- /dev/null
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
@@ -0,0 +1,24 @@
+package org.apache.apisix.plugin.runner.handler;
+
+import java.util.Map;
+
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
+
+class TestWatcher implements A6ConfigWatcher {
+ public Map<String, String> config;
+
+ public Map<String, String> getConfig() {
+ return config;
+ }
+
+ @Override
+ public String name() {
+ return "test";
+ }
+
+ @Override
+ public void watch(long confToken, A6Conf a6Conf) {
+ config = a6Conf.getConfig();
+ }
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
new file mode 100644
index 0000000..c3c0abe
--- /dev/null
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
@@ -0,0 +1,17 @@
+package org.apache.apisix.plugin.runner;
+
+public interface A6ConfigWatcher {
+ /**
+ * @return the name of config watcher
+ */
+ String name();
+
+ /**
+ * watch the change of the config
+ *
+ * @param confToken the config token
+ * @param a6Conf the config
+ */
+ default void watch(long confToken, A6Conf a6Conf) {
+ }
+}