You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by ch...@apache.org on 2022/10/31 09:53:43 UTC

[apisix-java-plugin-runner] 01/01: feat: support watching config for filters

This is an automated email from the ASF dual-hosted git repository.

chenjunxu pushed a commit to branch feat/watch
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git

commit f5ea54f4964caed618753a30db0936c0aae02819
Author: nic-chen <ch...@gmail.com>
AuthorDate: Mon Oct 31 17:53:34 2022 +0800

    feat: support watching config for filters
---
 .../plugin/runner/handler/PrepareConfHandler.java  |  5 ++++
 .../plugin/runner/server/ApplicationRunner.java    | 20 ++++++++++-----
 .../plugin/runner/handler/A6ConfigHandlerTest.java | 29 ++++++++++++++--------
 .../runner/handler/A6HttpCallHandlerTest.java      |  9 +++++--
 .../plugin/runner/handler/ExtraInfoTest.java       |  9 +++++--
 .../plugin/runner/handler/PostFilterTest.java      | 10 ++++++--
 .../apisix/plugin/runner/handler/TestWatcher.java  | 24 ++++++++++++++++++
 .../apisix/plugin/runner/A6ConfigWatcher.java      | 17 +++++++++++++
 8 files changed, 101 insertions(+), 22 deletions(-)

diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
index 799208a..6a7e9fa 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -26,6 +26,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.A6Request;
 import org.apache.apisix.plugin.runner.A6Response;
 import org.apache.apisix.plugin.runner.constants.Constants;
@@ -48,6 +49,7 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
 
     private final Cache<Long, A6Conf> cache;
     private final Map<String, PluginFilter> filters;
+    private final List<A6ConfigWatcher> watchers;
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
@@ -75,6 +77,9 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
         }
         A6Conf a6Conf = new A6Conf(config, chain);
         cache.put(token, a6Conf);
+        for (A6ConfigWatcher watcher : watchers) {
+            watcher.watch(token, a6Conf);
+        }
         ctx.write(response);
         ctx.writeAndFlush(response);
     }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
index 8c6ae48..865983a 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
@@ -47,7 +47,9 @@ import io.netty.channel.unix.DomainSocketAddress;
 import io.netty.channel.unix.DomainSocketChannel;
 import io.netty.handler.logging.LoggingHandler;
 import lombok.RequiredArgsConstructor;
+
 import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
 import org.apache.apisix.plugin.runner.handler.PrepareConfHandler;
 import org.apache.apisix.plugin.runner.handler.RpcCallHandler;
@@ -67,21 +69,27 @@ public class ApplicationRunner implements CommandLineRunner {
 
     private Cache<Long, A6Conf> cache;
 
-    private ObjectProvider<PluginFilter> beanProvider;
+    private ObjectProvider<PluginFilter> filterProvider;
+    private ObjectProvider<A6ConfigWatcher> watcherProvider;
 
     @Autowired
-    public ApplicationRunner(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+    public ApplicationRunner(Cache<Long, A6Conf> cache,
+            ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
         this.cache = cache;
-        this.beanProvider = beanProvider;
+        this.filterProvider = filterProvider;
+        this.watcherProvider = watcherProvider;
     }
 
-    public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+    public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache,
+            ObjectProvider<PluginFilter> beanProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
         List<PluginFilter> pluginFilterList = beanProvider.orderedStream().collect(Collectors.toList());
         Map<String, PluginFilter> filterMap = new HashMap<>();
         for (PluginFilter filter : pluginFilterList) {
             filterMap.put(filter.name(), filter);
         }
-        return new PrepareConfHandler(cache, filterMap);
+        List<A6ConfigWatcher> configWatcherList = watcherProvider.orderedStream().collect(Collectors.toList());
+
+        return new PrepareConfHandler(cache, filterMap, configWatcherList);
     }
 
     public RpcCallHandler createA6HttpHandler(Cache<Long, A6Conf> cache) {
@@ -123,7 +131,7 @@ public class ApplicationRunner implements CommandLineRunner {
                         .addAfter("logger", "payloadEncoder", new PayloadEncoder())
                         .addAfter("payloadEncoder", "delayedDecoder", new BinaryProtocolDecoder())
                         .addLast("payloadDecoder", new PayloadDecoder())
-                        .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, beanProvider))
+                        .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, filterProvider, watcherProvider))
                         .addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache))
                         .addLast("exceptionCaughtHandler", new ExceptionCaughtHandler());
 
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
index 0f33d68..ced6711 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
@@ -17,6 +17,16 @@
 
 package org.apache.apisix.plugin.runner.handler;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.flatbuffers.FlatBufferBuilder;
@@ -27,19 +37,11 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
 import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @DisplayName("test add filter")
 class A6ConfigHandlerTest {
@@ -48,8 +50,12 @@ class A6ConfigHandlerTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     PrepareConfHandler prepareConfHandler;
 
+    TestWatcher tWatcher = new TestWatcher() ;
+
     @BeforeEach
     void setUp() {
         filters = new HashMap<>();
@@ -96,8 +102,10 @@ class A6ConfigHandlerTest {
                 return null;
             }
         });
+        watchers = new ArrayList<>();
+        watchers.add(tWatcher);
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
     }
 
     @Test
@@ -124,6 +132,7 @@ class A6ConfigHandlerTest {
         Assertions.assertEquals(config.getChain().getFilters().size(), 1);
         Assertions.assertEquals(config.getChain().getIndex(), 0);
         Assertions.assertEquals(config.get("FooFilter"), "Bar");
+        Assertions.assertEquals(tWatcher.getConfig(), config.getConfig());
     }
 
     @Test
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
index b6a81e2..d7360cb 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
@@ -28,6 +28,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.A6ErrResponse;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,6 +60,8 @@ class A6HttpCallHandlerTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     EmbeddedChannel channel;
 
     PrepareConfHandler prepareConfHandler;
@@ -149,6 +153,7 @@ class A6HttpCallHandlerTest {
                 return null;
             }
         });
+        watchers = new ArrayList<>();
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
@@ -167,14 +172,14 @@ class A6HttpCallHandlerTest {
         io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
         channel.writeInbound(request);
         channel.finish();
         A6ConfigResponse response = channel.readOutbound();
         confToken = response.getConfToken();
 
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         rpcCallHandler = new RpcCallHandler(cache);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
     }
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
index d921858..fafc4e4 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
@@ -26,6 +26,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.ExtraInfoRequest;
 import org.apache.apisix.plugin.runner.ExtraInfoResponse;
 import org.apache.apisix.plugin.runner.HttpRequest;
@@ -41,6 +42,7 @@ import org.springframework.test.util.ReflectionTestUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +63,8 @@ class ExtraInfoTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     EmbeddedChannel channel;
 
     PrepareConfHandler prepareConfHandler;
@@ -127,6 +131,7 @@ class ExtraInfoTest {
                 return true;
             }
         });
+        watchers = new ArrayList<>();
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
@@ -145,14 +150,14 @@ class ExtraInfoTest {
         io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
         channel.writeInbound(request);
         channel.finish();
         A6ConfigResponse response = channel.readOutbound();
         confToken = response.getConfToken();
 
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         rpcCallHandler = new RpcCallHandler(cache);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
     }
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
index e2de6dc..98b7e9e 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
@@ -25,6 +25,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.PostRequest;
 import org.apache.apisix.plugin.runner.PostResponse;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
@@ -37,7 +38,9 @@ import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -52,6 +55,8 @@ public class PostFilterTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     EmbeddedChannel channel;
 
     PrepareConfHandler prepareConfHandler;
@@ -86,6 +91,7 @@ public class PostFilterTest {
                     }
                 }
         );
+        watchers = new ArrayList<>();
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
@@ -100,14 +106,14 @@ public class PostFilterTest {
         io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
         channel.writeInbound(request);
         channel.finish();
         A6ConfigResponse response = channel.readOutbound();
         confToken = response.getConfToken();
 
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         rpcCallHandler = new RpcCallHandler(cache);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
     }
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
new file mode 100644
index 0000000..e01d41c
--- /dev/null
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
@@ -0,0 +1,24 @@
+package org.apache.apisix.plugin.runner.handler;
+
+import java.util.Map;
+
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
+
+class TestWatcher implements A6ConfigWatcher {
+    public Map<String, String> config;
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    @Override
+    public String name() {
+        return "test";
+    }
+
+    @Override
+    public void watch(long confToken, A6Conf a6Conf) {
+        config = a6Conf.getConfig();
+    }
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
new file mode 100644
index 0000000..c3c0abe
--- /dev/null
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
@@ -0,0 +1,17 @@
+package org.apache.apisix.plugin.runner;
+
+public interface A6ConfigWatcher {
+    /**
+     * @return the name of config watcher
+     */
+    String name();
+
+    /**
+     * watch the change of the config
+     *
+     * @param confToken  the config token
+     * @param a6Conf the config
+     */
+    default void watch(long confToken, A6Conf a6Conf) {
+    }
+}