You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by tz...@apache.org on 2022/10/31 10:33:38 UTC
[apisix-java-plugin-runner] branch main updated: feat: support watching config changes (#208)
This is an automated email from the ASF dual-hosted git repository.
tzssangglass pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/apisix-java-plugin-runner.git
The following commit(s) were added to refs/heads/main by this push:
new 9c181d8 feat: support watching config changes (#208)
9c181d8 is described below
commit 9c181d814c903351e4bc092e30b33d7a5f159b4a
Author: JunXu Chen <ch...@gmail.com>
AuthorDate: Mon Oct 31 18:33:34 2022 +0800
feat: support watching config changes (#208)
Co-authored-by: github-actions[bot] <41...@users.noreply.github.com>
---
.../plugin/runner/handler/PrepareConfHandler.java | 5 +++
.../plugin/runner/server/ApplicationRunner.java | 20 ++++++---
.../plugin/runner/handler/A6ConfigHandlerTest.java | 30 ++++++++-----
.../runner/handler/A6HttpCallHandlerTest.java | 9 +++-
.../plugin/runner/handler/ExtraInfoTest.java | 9 +++-
.../plugin/runner/handler/PostFilterTest.java | 10 ++++-
.../apisix/plugin/runner/handler/TestWatcher.java | 49 ++++++++++++++++++++++
.../apisix/plugin/runner/A6ConfigWatcher.java | 36 ++++++++++++++++
8 files changed, 146 insertions(+), 22 deletions(-)
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
index 799208a..6a7e9fa 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java
@@ -26,6 +26,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.A6Response;
import org.apache.apisix.plugin.runner.constants.Constants;
@@ -48,6 +49,7 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
private final Cache<Long, A6Conf> cache;
private final Map<String, PluginFilter> filters;
+ private final List<A6ConfigWatcher> watchers;
@Override
protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
@@ -75,6 +77,9 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
}
A6Conf a6Conf = new A6Conf(config, chain);
cache.put(token, a6Conf);
+ for (A6ConfigWatcher watcher : watchers) {
+ watcher.watch(token, a6Conf);
+ }
ctx.write(response);
ctx.writeAndFlush(response);
}
diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
index 8c6ae48..865983a 100644
--- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
+++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java
@@ -47,7 +47,9 @@ import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
+
import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.filter.PluginFilter;
import org.apache.apisix.plugin.runner.handler.PrepareConfHandler;
import org.apache.apisix.plugin.runner.handler.RpcCallHandler;
@@ -67,21 +69,27 @@ public class ApplicationRunner implements CommandLineRunner {
private Cache<Long, A6Conf> cache;
- private ObjectProvider<PluginFilter> beanProvider;
+ private ObjectProvider<PluginFilter> filterProvider;
+ private ObjectProvider<A6ConfigWatcher> watcherProvider;
@Autowired
- public ApplicationRunner(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+ public ApplicationRunner(Cache<Long, A6Conf> cache,
+ ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
this.cache = cache;
- this.beanProvider = beanProvider;
+ this.filterProvider = filterProvider;
+ this.watcherProvider = watcherProvider;
}
- public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
+ public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache,
+ ObjectProvider<PluginFilter> beanProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
List<PluginFilter> pluginFilterList = beanProvider.orderedStream().collect(Collectors.toList());
Map<String, PluginFilter> filterMap = new HashMap<>();
for (PluginFilter filter : pluginFilterList) {
filterMap.put(filter.name(), filter);
}
- return new PrepareConfHandler(cache, filterMap);
+ List<A6ConfigWatcher> configWatcherList = watcherProvider.orderedStream().collect(Collectors.toList());
+
+ return new PrepareConfHandler(cache, filterMap, configWatcherList);
}
public RpcCallHandler createA6HttpHandler(Cache<Long, A6Conf> cache) {
@@ -123,7 +131,7 @@ public class ApplicationRunner implements CommandLineRunner {
.addAfter("logger", "payloadEncoder", new PayloadEncoder())
.addAfter("payloadEncoder", "delayedDecoder", new BinaryProtocolDecoder())
.addLast("payloadDecoder", new PayloadDecoder())
- .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, beanProvider))
+ .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, filterProvider, watcherProvider))
.addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache))
.addLast("exceptionCaughtHandler", new ExceptionCaughtHandler());
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
index 0f33d68..b1a09ab 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java
@@ -17,6 +17,16 @@
package org.apache.apisix.plugin.runner.handler;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.flatbuffers.FlatBufferBuilder;
@@ -27,19 +37,11 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.HttpResponse;
import org.apache.apisix.plugin.runner.filter.PluginFilter;
import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
@DisplayName("test add filter")
class A6ConfigHandlerTest {
@@ -48,8 +50,12 @@ class A6ConfigHandlerTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
PrepareConfHandler prepareConfHandler;
+ TestWatcher tWatcher = new TestWatcher() ;
+
@BeforeEach
void setUp() {
filters = new HashMap<>();
@@ -96,8 +102,10 @@ class A6ConfigHandlerTest {
return null;
}
});
+ watchers = new ArrayList<>();
+ watchers.add(tWatcher);
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
}
@Test
@@ -124,6 +132,8 @@ class A6ConfigHandlerTest {
Assertions.assertEquals(config.getChain().getFilters().size(), 1);
Assertions.assertEquals(config.getChain().getIndex(), 0);
Assertions.assertEquals(config.get("FooFilter"), "Bar");
+ Assertions.assertEquals(tWatcher.getConfig(), config.getConfig());
+ Assertions.assertEquals(tWatcher.getToken(), response.getConfToken());
}
@Test
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
index b6a81e2..d7360cb 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java
@@ -28,6 +28,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.A6ErrResponse;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.HttpResponse;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,6 +60,8 @@ class A6HttpCallHandlerTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
EmbeddedChannel channel;
PrepareConfHandler prepareConfHandler;
@@ -149,6 +153,7 @@ class A6HttpCallHandlerTest {
return null;
}
});
+ watchers = new ArrayList<>();
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -167,14 +172,14 @@ class A6HttpCallHandlerTest {
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
A6ConfigRequest request = new A6ConfigRequest(req);
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
channel.writeInbound(request);
channel.finish();
A6ConfigResponse response = channel.readOutbound();
confToken = response.getConfToken();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
rpcCallHandler = new RpcCallHandler(cache);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
}
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
index d921858..fafc4e4 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java
@@ -26,6 +26,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.ExtraInfoRequest;
import org.apache.apisix.plugin.runner.ExtraInfoResponse;
import org.apache.apisix.plugin.runner.HttpRequest;
@@ -41,6 +42,7 @@ import org.springframework.test.util.ReflectionTestUtils;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,6 +63,8 @@ class ExtraInfoTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
EmbeddedChannel channel;
PrepareConfHandler prepareConfHandler;
@@ -127,6 +131,7 @@ class ExtraInfoTest {
return true;
}
});
+ watchers = new ArrayList<>();
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -145,14 +150,14 @@ class ExtraInfoTest {
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
A6ConfigRequest request = new A6ConfigRequest(req);
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
channel.writeInbound(request);
channel.finish();
A6ConfigResponse response = channel.readOutbound();
confToken = response.getConfToken();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
rpcCallHandler = new RpcCallHandler(cache);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
}
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
index e2de6dc..98b7e9e 100644
--- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java
@@ -25,6 +25,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.apisix.plugin.runner.A6Conf;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
import org.apache.apisix.plugin.runner.PostRequest;
import org.apache.apisix.plugin.runner.PostResponse;
import org.apache.apisix.plugin.runner.filter.PluginFilter;
@@ -37,7 +38,9 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -52,6 +55,8 @@ public class PostFilterTest {
Map<String, PluginFilter> filters;
+ List<A6ConfigWatcher> watchers;
+
EmbeddedChannel channel;
PrepareConfHandler prepareConfHandler;
@@ -86,6 +91,7 @@ public class PostFilterTest {
}
}
);
+ watchers = new ArrayList<>();
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
FlatBufferBuilder builder = new FlatBufferBuilder();
@@ -100,14 +106,14 @@ public class PostFilterTest {
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
A6ConfigRequest request = new A6ConfigRequest(req);
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
channel.writeInbound(request);
channel.finish();
A6ConfigResponse response = channel.readOutbound();
confToken = response.getConfToken();
- prepareConfHandler = new PrepareConfHandler(cache, filters);
+ prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
rpcCallHandler = new RpcCallHandler(cache);
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
}
diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
new file mode 100644
index 0000000..8c5b18d
--- /dev/null
+++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestWatcher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apisix.plugin.runner.handler;
+
+import java.util.Map;
+
+import org.apache.apisix.plugin.runner.A6Conf;
+import org.apache.apisix.plugin.runner.A6ConfigWatcher;
+
+class TestWatcher implements A6ConfigWatcher {
+ private Map<String, String> config;
+ private long token;
+
+ public Map<String, String> getConfig() {
+ return config;
+ }
+
+ public long getToken() {
+ return token;
+ }
+
+ @Override
+ public String name() {
+ return "test";
+ }
+
+ @Override
+ public void watch(long confToken, A6Conf a6Conf) {
+ config = a6Conf.getConfig();
+ token = confToken;
+ }
+}
diff --git a/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
new file mode 100644
index 0000000..87189d6
--- /dev/null
+++ b/runner-plugin-sdk/src/main/java/org/apache/apisix/plugin/runner/A6ConfigWatcher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apisix.plugin.runner;
+
+public interface A6ConfigWatcher {
+ /**
+ * @return the name of config watcher
+ */
+ String name();
+
+ /**
+ * watch the change of the config
+ *
+ * @param confToken the config token
+ * @param a6Conf the config
+ */
+ default void watch(long confToken, A6Conf a6Conf) {
+ }
+}