You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2021/03/08 17:28:07 UTC
[aries-component-dsl] 02/10: Add refreshAsUpdate
This is an automated email from the ASF dual-hosted git repository.
csierra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-component-dsl.git
commit e2d1c3def22679e9161f8a3735544545a42297ac
Author: Carlos Sierra Andrés <ca...@liferay.com>
AuthorDate: Mon Mar 8 14:10:02 2021 +0100
Add refreshAsUpdate
to allow to _recalculate_ parts of the _tree_ and treat them as
updates afterwards
---
.../java/org/apache/aries/component/dsl/OSGi.java | 53 +++++++++++++++++
.../component/dsl/internal/UpdateSupport.java | 4 ++
.../apache/aries/component/dsl/test/DSLTest.java | 67 ++++++++++++++++++++++
3 files changed, 124 insertions(+)
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index b935efa..de175db 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -454,6 +454,59 @@ public interface OSGi<T> extends OSGiRunnable<T> {
return new RecoverWithOSGi<>(program, function);
}
+ static <T> OSGi<T> refreshAsUpdates(OSGi<T> program) {
+ return program.transform(op -> {
+ class ResultState {
+ boolean gone;
+ OSGiResult result;
+
+ public ResultState(boolean gone, OSGiResult result) {
+ this.gone = gone;
+ this.result = result;
+ }
+ }
+
+ ThreadLocal<ResultState> threadLocal = ThreadLocal.withInitial(() -> null);
+
+ return t -> {
+ AtomicReference<OSGiResult> atomicReference = new AtomicReference<>(NOOP);
+
+ if (!UpdateSupport.isUpdate()) {
+ atomicReference.set(op.publish(t));
+ }
+ else {
+ threadLocal.get().gone = false;
+ }
+
+ return new OSGiResultImpl(
+ () -> {
+ if (!UpdateSupport.isUpdate()) {
+ atomicReference.getAndSet(NOOP).run();
+ }
+ else {
+ threadLocal.set(new ResultState(true, atomicReference.get()));
+
+ UpdateSupport.deferTermination(
+ () -> {
+ if (threadLocal.get().gone) {
+ threadLocal.get().result.run();
+
+ threadLocal.remove();
+ atomicReference.set(NOOP);
+ }
+ else {
+ threadLocal.get().result.update();
+ }
+ }
+ );
+ }
+ },
+ () -> atomicReference.get().update()
+ );
+ };
+ });
+ }
+
static <T> OSGi<T> refreshWhen(OSGi<T> program, Predicate<T> refresher) {
return new RefreshWhenOSGi<>(program, refresher);
}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
index 5008bf9..253ca30 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
@@ -32,6 +32,10 @@ public class UpdateSupport {
private static final ThreadLocal<Boolean> isUpdate =
ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static boolean isUpdate() {
+ return isUpdate.get();
+ }
+
public static void deferPublication(Runnable runnable) {
if (isUpdate.get()) {
deferredPublishersStack.get().peekLast().addLast(runnable);
diff --git a/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java b/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
index f98cd4a..0dc4feb 100644
--- a/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
+++ b/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
@@ -1921,6 +1921,73 @@ public class DSLTest {
}
}
+ @Test
+ public void testServiceReferenceWithFilterUpdates() {
+ AtomicReference<String> atomicReference = new AtomicReference<>();
+
+ ServiceRegistration<Service> serviceRegistration =
+ bundleContext.registerService(
+ Service.class, new Service(),
+ new Hashtable<String, Object>() {{
+ put("property", "original");
+ put("admissible", "true");
+ }});
+
+ AtomicInteger atomicInteger = new AtomicInteger();
+
+ try {
+ OSGi<?> program =
+ refreshAsUpdates(
+ serviceReferences(
+ Service.class
+ ).filter(
+ sr -> sr.getProperty("admissible").equals("true")
+ )
+ ).map(
+ CachingServiceReference::getServiceReference
+ ).effects(
+ sr -> {
+ atomicReference.set(
+ String.valueOf(sr.getProperty("property")));
+
+ atomicInteger.incrementAndGet();
+ },
+ __ -> {},
+ __ -> {},
+ __ -> atomicReference.set(null),
+ sr ->
+ atomicReference.set(
+ String.valueOf(sr.getProperty("property")))
+ );
+
+ program.run(bundleContext);
+
+ assertEquals(1, atomicInteger.get());
+ assertEquals("original", atomicReference.get());
+
+ serviceRegistration.setProperties(
+ new Hashtable<String, Object>() {{
+ put("property", "updated");
+ put("admissible", "true");
+ }});
+
+ assertEquals(1, atomicInteger.get());
+ assertEquals("updated", atomicReference.get());
+
+ serviceRegistration.setProperties(
+ new Hashtable<String, Object>() {{
+ put("property", "updated");
+ put("admissible", "false");
+ }});
+
+ assertEquals(1, atomicInteger.get());
+ assertNull(atomicReference.get());
+ }
+ finally {
+ serviceRegistration.unregister();
+ }
+ }
+
@Test
public void testServiceReferenceUpdatesWithSelector() {