You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by tz...@apache.org on 2022/10/31 10:33:38 UTC

[apisix-java-plugin-runner] branch main updated: feat: support watching config changes (#208)

This is an automated email from the ASF dual-hosted git repository.

tzssangglass pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c181d8  feat: support watching config changes (#208)
9c181d8 is described below

commit 9c181d814c903351e4bc092e30b33d7a5f159b4a
Author: JunXu Chen <ch...@gmail.com>
AuthorDate: Mon Oct 31 18:33:34 2022 +0800

    feat: support watching config changes (#208)
    
    Co-authored-by: github-actions[bot] <41...@users.noreply.github.com>
---
 .../plugin/runner/handler/PrepareConfHandler.java  |  5 +++
 .../plugin/runner/server/ApplicationRunner.java    | 20 ++++++---
 .../plugin/runner/handler/A6ConfigHandlerTest.java | 30 ++++++++-----
 .../runner/handler/A6HttpCallHandlerTest.java      |  9 +++-
 .../plugin/runner/handler/ExtraInfoTest.java       |  9 +++-
 .../plugin/runner/handler/PostFilterTest.java      | 10 ++++-
 .../apisix/plugin/runner/handler/TestWatcher.java  | 49 ++++++++++++++++++++++
 .../apisix/plugin/runner/A6ConfigWatcher.java      | 36 ++++++++++++++++
 8 files changed, 146 insertions(+), 22 deletions(-)

diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
index 799208a..6a7e9fa 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -26,6 +26,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.A6Request;
 import org.apache.apisix.plugin.runner.A6Response;
 import org.apache.apisix.plugin.runner.constants.Constants;
@@ -48,6 +49,7 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
 
     private final Cache<Long, A6Conf> cache;
     private final Map<String, PluginFilter> filters;
+    private final List<A6ConfigWatcher> watchers;
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
@@ -75,6 +77,9 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
         }
         A6Conf a6Conf = new A6Conf(config, chain);
         cache.put(token, a6Conf);
+        for (A6ConfigWatcher watcher : watchers) {
+            watcher.watch(token, a6Conf);
+        }
         ctx.write(response);
         ctx.writeAndFlush(response);
     }
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
index 8c6ae48..865983a 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
@@ -47,7 +47,9 @@ import io.netty.channel.unix.DomainSocketAddress;
 import io.netty.channel.unix.DomainSocketChannel;
 import io.netty.handler.logging.LoggingHandler;
 import lombok.RequiredArgsConstructor;
+
 import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
 import org.apache.apisix.plugin.runner.handler.PrepareConfHandler;
 import org.apache.apisix.plugin.runner.handler.RpcCallHandler;
@@ -67,21 +69,27 @@ public class ApplicationRunner implements CommandLineRunner {
 
     private Cache<Long, A6Conf> cache;
 
-    private ObjectProvider<PluginFilter> beanProvider;
+    private ObjectProvider<PluginFilter> filterProvider;
+    private ObjectProvider<A6ConfigWatcher> watcherProvider;
 
     @Autowired
-    public ApplicationRunner(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+    public ApplicationRunner(Cache<Long, A6Conf> cache,
+            ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
         this.cache = cache;
-        this.beanProvider = beanProvider;
+        this.filterProvider = filterProvider;
+        this.watcherProvider = watcherProvider;
     }
 
-    public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+    public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache,
+            ObjectProvider<PluginFilter> beanProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
         List<PluginFilter> pluginFilterList = beanProvider.orderedStream().collect(Collectors.toList());
         Map<String, PluginFilter> filterMap = new HashMap<>();
         for (PluginFilter filter : pluginFilterList) {
             filterMap.put(filter.name(), filter);
         }
-        return new PrepareConfHandler(cache, filterMap);
+        List<A6ConfigWatcher> configWatcherList = watcherProvider.orderedStream().collect(Collectors.toList());
+
+        return new PrepareConfHandler(cache, filterMap, configWatcherList);
     }
 
     public RpcCallHandler createA6HttpHandler(Cache<Long, A6Conf> cache) {
@@ -123,7 +131,7 @@ public class ApplicationRunner implements CommandLineRunner {
                         .addAfter("logger", "payloadEncoder", new PayloadEncoder())
                         .addAfter("payloadEncoder", "delayedDecoder", new BinaryProtocolDecoder())
                         .addLast("payloadDecoder", new PayloadDecoder())
-                        .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, beanProvider))
+                        .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, filterProvider, watcherProvider))
                         .addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache))
                         .addLast("exceptionCaughtHandler", new ExceptionCaughtHandler());
 
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
index 0f33d68..b1a09ab 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
@@ -17,6 +17,16 @@
 
 package org.apache.apisix.plugin.runner.handler;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.flatbuffers.FlatBufferBuilder;
@@ -27,19 +37,11 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
 import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @DisplayName("test add filter")
 class A6ConfigHandlerTest {
@@ -48,8 +50,12 @@ class A6ConfigHandlerTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     PrepareConfHandler prepareConfHandler;
 
+    TestWatcher tWatcher = new TestWatcher() ;
+
     @BeforeEach
     void setUp() {
         filters = new HashMap<>();
@@ -96,8 +102,10 @@ class A6ConfigHandlerTest {
                 return null;
             }
         });
+        watchers = new ArrayList<>();
+        watchers.add(tWatcher);
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
     }
 
     @Test
@@ -124,6 +132,8 @@ class A6ConfigHandlerTest {
         Assertions.assertEquals(config.getChain().getFilters().size(), 1);
         Assertions.assertEquals(config.getChain().getIndex(), 0);
         Assertions.assertEquals(config.get("FooFilter"), "Bar");
+        Assertions.assertEquals(tWatcher.getConfig(), config.getConfig());
+        Assertions.assertEquals(tWatcher.getToken(), response.getConfToken());
     }
 
     @Test
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
index b6a81e2..d7360cb 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
@@ -28,6 +28,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.A6ErrResponse;
 import org.apache.apisix.plugin.runner.HttpRequest;
 import org.apache.apisix.plugin.runner.HttpResponse;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,6 +60,8 @@ class A6HttpCallHandlerTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     EmbeddedChannel channel;
 
     PrepareConfHandler prepareConfHandler;
@@ -149,6 +153,7 @@ class A6HttpCallHandlerTest {
                 return null;
             }
         });
+        watchers = new ArrayList<>();
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
@@ -167,14 +172,14 @@ class A6HttpCallHandlerTest {
         io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
         channel.writeInbound(request);
         channel.finish();
         A6ConfigResponse response = channel.readOutbound();
         confToken = response.getConfToken();
 
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         rpcCallHandler = new RpcCallHandler(cache);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
     }
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
index d921858..fafc4e4 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
@@ -26,6 +26,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.ExtraInfoRequest;
 import org.apache.apisix.plugin.runner.ExtraInfoResponse;
 import org.apache.apisix.plugin.runner.HttpRequest;
@@ -41,6 +42,7 @@ import org.springframework.test.util.ReflectionTestUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +63,8 @@ class ExtraInfoTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     EmbeddedChannel channel;
 
     PrepareConfHandler prepareConfHandler;
@@ -127,6 +131,7 @@ class ExtraInfoTest {
                 return true;
             }
         });
+        watchers = new ArrayList<>();
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
@@ -145,14 +150,14 @@ class ExtraInfoTest {
         io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
         channel.writeInbound(request);
         channel.finish();
         A6ConfigResponse response = channel.readOutbound();
         confToken = response.getConfToken();
 
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         rpcCallHandler = new RpcCallHandler(cache);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
     }
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
index e2de6dc..98b7e9e 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
@@ -25,6 +25,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.apisix.plugin.runner.A6Conf;
 import org.apache.apisix.plugin.runner.A6ConfigRequest;
 import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
 import org.apache.apisix.plugin.runner.PostRequest;
 import org.apache.apisix.plugin.runner.PostResponse;
 import org.apache.apisix.plugin.runner.filter.PluginFilter;
@@ -37,7 +38,9 @@ import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -52,6 +55,8 @@ public class PostFilterTest {
 
     Map<String, PluginFilter> filters;
 
+    List<A6ConfigWatcher> watchers;
+
     EmbeddedChannel channel;
 
     PrepareConfHandler prepareConfHandler;
@@ -86,6 +91,7 @@ public class PostFilterTest {
                     }
                 }
         );
+        watchers = new ArrayList<>();
         cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
         FlatBufferBuilder builder = new FlatBufferBuilder();
 
@@ -100,14 +106,14 @@ public class PostFilterTest {
         io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
 
         A6ConfigRequest request = new A6ConfigRequest(req);
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
         channel.writeInbound(request);
         channel.finish();
         A6ConfigResponse response = channel.readOutbound();
         confToken = response.getConfToken();
 
-        prepareConfHandler = new PrepareConfHandler(cache, filters);
+        prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
         rpcCallHandler = new RpcCallHandler(cache);
         channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
     }
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
new file mode 100644
index 0000000..8c5b18d
--- /dev/null
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import java.util.Map;
+
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
+
+class TestWatcher implements A6ConfigWatcher {
+    private Map<String, String> config;
+    private long token;
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public long getToken() {
+        return token;
+    }
+
+    @Override
+    public String name() {
+        return "test";
+    }
+
+    @Override
+    public void watch(long confToken, A6Conf a6Conf) {
+        config = a6Conf.getConfig();
+        token = confToken;
+    }
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
new file mode 100644
index 0000000..87189d6
--- /dev/null
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apisix.plugin.runner;
+
+public interface A6ConfigWatcher {
+    /**
+     * @return the name of config watcher
+     */
+    String name();
+
+    /**
+     * watch the change of the config
+     *
+     * @param confToken  the config token
+     * @param a6Conf the config
+     */
+    default void watch(long confToken, A6Conf a6Conf) {
+    }
+}