You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2021/03/08 17:28:07 UTC

[aries-component-dsl] 02/10: Add refreshAsUpdate

This is an automated email from the ASF dual-hosted git repository.

csierra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-component-dsl.git

commit e2d1c3def22679e9161f8a3735544545a42297ac
Author: Carlos Sierra Andrés <ca...@liferay.com>
AuthorDate: Mon Mar 8 14:10:02 2021 +0100

    Add refreshAsUpdate
    
    to allow to _recalculate_ parts of the _tree_ and treat them as
    updates afterwards
---
 .../java/org/apache/aries/component/dsl/OSGi.java  | 53 +++++++++++++++++
 .../component/dsl/internal/UpdateSupport.java      |  4 ++
 .../apache/aries/component/dsl/test/DSLTest.java   | 67 ++++++++++++++++++++++
 3 files changed, 124 insertions(+)

diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index b935efa..de175db 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -454,6 +454,59 @@ public interface OSGi<T> extends OSGiRunnable<T> {
 		return new RecoverWithOSGi<>(program, function);
 	}
 
+	static <T> OSGi<T> refreshAsUpdates(OSGi<T> program) {
+		return program.transform(op -> {
+			class ResultState {
+				boolean gone;
+				OSGiResult result;
+
+				public ResultState(boolean gone, OSGiResult result) {
+					this.gone = gone;
+					this.result = result;
+				}
+			}
+
+			ThreadLocal<ResultState> threadLocal = ThreadLocal.withInitial(() -> null);
+
+			return t -> {
+				AtomicReference<OSGiResult> atomicReference = new AtomicReference<>(NOOP);
+
+				if (!UpdateSupport.isUpdate()) {
+					atomicReference.set(op.publish(t));
+				}
+				else {
+					threadLocal.get().gone = false;
+				}
+
+				return new OSGiResultImpl(
+					() -> {
+						if (!UpdateSupport.isUpdate()) {
+							atomicReference.getAndSet(NOOP).run();
+						}
+						else {
+							threadLocal.set(new ResultState(true, atomicReference.get()));
+
+							UpdateSupport.deferTermination(
+								() -> {
+									if (threadLocal.get().gone) {
+										threadLocal.get().result.run();
+
+										threadLocal.remove();
+										atomicReference.set(NOOP);
+									}
+									else {
+										threadLocal.get().result.update();
+									}
+								}
+							);
+						}
+					},
+					() -> atomicReference.get().update()
+				);
+			};
+		});
+	}
+
 	static <T> OSGi<T> refreshWhen(OSGi<T> program, Predicate<T> refresher) {
 		return new RefreshWhenOSGi<>(program, refresher);
 	}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
index 5008bf9..253ca30 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/UpdateSupport.java
@@ -32,6 +32,10 @@ public class UpdateSupport {
     private static final ThreadLocal<Boolean> isUpdate =
         ThreadLocal.withInitial(() -> Boolean.FALSE);
 
+    public static boolean isUpdate() {
+        return isUpdate.get();
+    }
+
     public static void deferPublication(Runnable runnable) {
         if (isUpdate.get()) {
             deferredPublishersStack.get().peekLast().addLast(runnable);
diff --git a/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java b/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
index f98cd4a..0dc4feb 100644
--- a/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
+++ b/itests/src/main/java/org/apache/aries/component/dsl/test/DSLTest.java
@@ -1921,6 +1921,73 @@ public class DSLTest {
         }
     }
 
+    @Test
+    public void testServiceReferenceWithFilterUpdates() {
+        AtomicReference<String> atomicReference = new AtomicReference<>();
+
+        ServiceRegistration<Service> serviceRegistration =
+            bundleContext.registerService(
+                Service.class, new Service(),
+                new Hashtable<String, Object>() {{
+                    put("property", "original");
+                    put("admissible", "true");
+                }});
+
+        AtomicInteger atomicInteger = new AtomicInteger();
+
+        try {
+            OSGi<?> program =
+                refreshAsUpdates(
+                    serviceReferences(
+                        Service.class
+                    ).filter(
+                        sr -> sr.getProperty("admissible").equals("true")
+                    )
+                ).map(
+                    CachingServiceReference::getServiceReference
+                ).effects(
+                    sr -> {
+                        atomicReference.set(
+                            String.valueOf(sr.getProperty("property")));
+
+                        atomicInteger.incrementAndGet();
+                    },
+                    __ -> {},
+                    __ -> {},
+                    __ -> atomicReference.set(null),
+                    sr ->
+                        atomicReference.set(
+                            String.valueOf(sr.getProperty("property")))
+                );
+
+            program.run(bundleContext);
+
+            assertEquals(1, atomicInteger.get());
+            assertEquals("original", atomicReference.get());
+
+            serviceRegistration.setProperties(
+                new Hashtable<String, Object>() {{
+                    put("property", "updated");
+                    put("admissible", "true");
+                }});
+
+            assertEquals(1, atomicInteger.get());
+            assertEquals("updated", atomicReference.get());
+
+            serviceRegistration.setProperties(
+                new Hashtable<String, Object>() {{
+                    put("property", "updated");
+                    put("admissible", "false");
+                }});
+
+            assertEquals(1, atomicInteger.get());
+            assertNull(atomicReference.get());
+        }
+        finally {
+            serviceRegistration.unregister();
+        }
+    }
+
 
     @Test
     public void testServiceReferenceUpdatesWithSelector() {