You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2021/03/03 16:44:03 UTC

[aries-component-dsl] 03/09: Add refresh support for update signals

This is an automated email from the ASF dual-hosted git repository.

csierra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-component-dsl.git

commit e4b756c50c224f2542b19eb5d636267c981511c6
Author: Carlos Sierra Andrés <ca...@liferay.com>
AuthorDate: Fri Feb 26 08:26:08 2021 +0100

    Add refresh support for update signals
---
 .../java/org/apache/aries/component/dsl/OSGi.java  | 31 ++++++++----
 .../org/apache/aries/component/dsl/OSGiResult.java |  4 +-
 .../dsl/internal/AggregateOSGiResult.java          |  8 ++-
 .../aries/component/dsl/internal/AllOSGi.java      |  2 +-
 .../aries/component/dsl/internal/BaseOSGiImpl.java | 29 ++++++-----
 .../aries/component/dsl/internal/BundleOSGi.java   |  6 ++-
 .../component/dsl/internal/CoalesceOSGiImpl.java   | 11 ++--
 .../dsl/internal/ConfigurationOSGiImpl.java        |  2 +-
 .../dsl/internal/ConfigurationsOSGiImpl.java       | 12 ++---
 .../component/dsl/internal/DistributeOSGiImpl.java |  6 ++-
 .../aries/component/dsl/internal/EffectsOSGi.java  |  2 +-
 .../component/dsl/internal/HighestRankingOSGi.java |  2 +-
 .../aries/component/dsl/internal/JustOSGiImpl.java | 10 ++--
 .../component/dsl/internal/NothingOSGiImpl.java    |  2 +-
 .../component/dsl/internal/OSGiResultImpl.java     | 13 +++--
 .../apache/aries/component/dsl/internal/Pad.java   |  4 +-
 .../component/dsl/internal/RefreshWhenOSGi.java    | 58 ++++++++++++++++++++++
 .../dsl/internal/ServiceReferenceOSGi.java         | 33 ++++--------
 18 files changed, 157 insertions(+), 78 deletions(-)

diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index 1adfba9..fc6597c 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -45,6 +45,7 @@ import org.apache.aries.component.dsl.function.Function3;
 import org.apache.aries.component.dsl.function.Function5;
 import org.apache.aries.component.dsl.function.Function7;
 import org.apache.aries.component.dsl.update.UpdateQuery;
+import org.apache.aries.component.dsl.update.UpdateSelector;
 import org.apache.aries.component.dsl.update.UpdateTuple;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
@@ -56,11 +57,7 @@ import org.osgi.framework.ServiceRegistration;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
+import java.util.function.*;
 
 /**
  * @author Carlos Sierra Andrés
@@ -331,7 +328,12 @@ public interface OSGi<T> extends OSGiRunnable<T> {
 	}
 
 	static <T> OSGi<T> fromOsgiRunnable(OSGiRunnable<T> runnable) {
-		return getOsgiFactory().create(runnable);
+		return getOsgiFactory().create(
+			(ec, op) -> new OSGiResultImpl(runnable.run(ec, op), __ -> true));
+	}
+
+	static <T> OSGi<T> fromOsgiRunnableWithUpdateSupport(OSGiRunnable<T> runnable) {
+		return getOsgiFactory().create(runnable::run);
 	}
 
 	static OSGiFactory getOsgiFactory() {
@@ -449,6 +451,10 @@ public interface OSGi<T> extends OSGiRunnable<T> {
 		return new RecoverWithOSGi<>(program, function);
 	}
 
+	static <T> OSGi<T> refreshWhen(OSGi<T> program, BiPredicate<UpdateSelector, T> refresher) {
+		return new RefreshWhenOSGi<>(program, refresher);
+	}
+
 	static <T> OSGi<ServiceRegistration<T>> register(
 		Class<T> clazz, T service, Map<String, Object> properties) {
 
@@ -544,21 +550,28 @@ public interface OSGi<T> extends OSGiRunnable<T> {
 		Class<T> clazz, String filterString,
 		Refresher<? super CachingServiceReference<T>> onModified) {
 
-		return new ServiceReferenceOSGi<>(filterString, clazz, onModified).map(UpdateTuple::getT);
+		return refreshWhen(
+			serviceReferences(clazz, filterString),
+			(__, csr) -> onModified.test(csr));
+
 	}
 
 	static <T> OSGi<CachingServiceReference<T>> serviceReferences(
 		Class<T> clazz,
 		Refresher<? super CachingServiceReference<T>> onModified) {
 
-		return new ServiceReferenceOSGi<>(null, clazz, onModified).map(UpdateTuple::getT);
+		return refreshWhen(
+			serviceReferences(clazz, (String)null),
+			(__, csr) -> onModified.test(csr));
 	}
 
 	static OSGi<CachingServiceReference<Object>> serviceReferences(
 		String filterString,
 		Refresher<? super CachingServiceReference<Object>> onModified) {
 
-		return new ServiceReferenceOSGi<>(filterString, null, onModified).map(UpdateTuple::getT);
+		return refreshWhen(
+			serviceReferences(null, filterString),
+			(__, csr) -> onModified.test(csr));
 	}
 
 	static <T> OSGi<T> services(Class<T> clazz) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
index ad56515..8085975 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGiResult.java
@@ -31,6 +31,8 @@ public interface OSGiResult extends AutoCloseable, Runnable {
 		close();
 	}
 
-	public default void update(UpdateSelector updateSelector) {};
+	public default boolean update(UpdateSelector updateSelector) {
+		return false;
+	};
 
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
index 24e5124..a6f98e7 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AggregateOSGiResult.java
@@ -46,15 +46,19 @@ public class AggregateOSGiResult implements OSGiResult {
     }
 
     @Override
-    public void update(UpdateSelector updateSelector) {
+    public boolean update(UpdateSelector updateSelector) {
+        boolean bool = false;
+
         if (!_closed.get()) {
             for (OSGiResult result : results) {
                 try {
-                    result.update(updateSelector);
+                    bool |= result.update(updateSelector);
                 } catch (Exception e) {
                 }
             }
         }
+
+        return bool;
     }
 
     private final AtomicBoolean _closed = new AtomicBoolean();
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
index aa652c6..341792e 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/AllOSGi.java
@@ -46,7 +46,7 @@ public class AllOSGi<T> extends OSGiImpl<T> {
 
             return new OSGiResultImpl(
                 () -> cleanUp(results),
-                us -> results.forEach(result -> result.update(us))
+                us -> results.stream().map(result -> result.update(us)).reduce(Boolean.FALSE, Boolean::logicalOr)
             );
         });
     }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
index cb554a2..4f54586 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java
@@ -136,12 +136,13 @@ public class BaseOSGiImpl<T> implements OSGi<T> {
 							},
 							us -> {
 								synchronized (identities) {
-
-									identities.forEach(t -> {
+									return identities.stream().map(t -> {
 										OSGiResult terminator = terminators.get(t).get(f);
 
-										terminator.update(us);
-									});
+										return terminator.update(us);
+									}).reduce(
+										Boolean.FALSE, Boolean::logicalOr
+									);
 								}
 							}
 						);
@@ -176,11 +177,13 @@ public class BaseOSGiImpl<T> implements OSGi<T> {
 							},
 							us -> {
 								synchronized (identities) {
-									functions.forEach(f -> {
+									return functions.stream().map(f -> {
 										OSGiResult terminator = terminators.get(t).get(f);
 
-										terminator.update(us);
-									});
+										return terminator.update(us);
+									}).reduce(
+										Boolean.FALSE, Boolean::logicalOr
+									);
 								}
 							}
 						);
@@ -281,7 +284,7 @@ public class BaseOSGiImpl<T> implements OSGi<T> {
 									}
 								}
 
-								terminator.update(us);
+								return terminator.update(us);
 							}
 						);
 
@@ -397,11 +400,11 @@ public class BaseOSGiImpl<T> implements OSGi<T> {
 
 					result.close();
 				},
-				us -> {
-					pads.values().forEach(pad -> pad.update(us));
-
-					result.close();
-				}
+				us -> pads.values().stream().map(
+					pad -> pad.update(us)
+				).reduce(
+					Boolean.FALSE, Boolean::logicalOr
+				) | result.update(us)
 			);
 		});
 	}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
index 2b5ca5c..9eae7fd 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BundleOSGi.java
@@ -66,7 +66,11 @@ public class BundleOSGi extends OSGiImpl<Bundle> {
 
 			return new OSGiResultImpl(
 				bundleTracker::close,
-				us -> bundleTracker.getTracked().values().forEach(result -> result.update(us))
+				us -> bundleTracker.getTracked().values().stream().map(
+					result -> result.update(us)
+				).reduce(
+					Boolean.FALSE, Boolean::logicalOr
+				)
 			);
 		});
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
index 3aff2be..6ce33a6 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java
@@ -21,6 +21,7 @@ import org.apache.aries.component.dsl.OSGi;
 import org.apache.aries.component.dsl.OSGiResult;
 import org.apache.aries.component.dsl.Publisher;
 
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -96,7 +97,7 @@ public class CoalesceOSGiImpl<T> extends OSGiImpl<T> {
                     }),
                     us -> {
                         synchronized (initialized) {
-                            result.get().update(us);
+                            return result.get().update(us);
                         }
                     });
                 };
@@ -131,9 +132,11 @@ public class CoalesceOSGiImpl<T> extends OSGiImpl<T> {
                 },
                 us -> {
                     synchronized (initialized) {
-                        for (int i = 0; i <= index.get(); i++) {
-                            results[i].update(us);
-                        }
+                        return Arrays.stream(results).map(
+                            res -> res.update(us)
+                        ).reduce(
+                            Boolean.FALSE, Boolean::logicalOr
+                        );
                     }
                 }
             );
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
index 26cefbe..36c8514 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationOSGiImpl.java
@@ -45,7 +45,7 @@ public class ConfigurationOSGiImpl extends OSGiImpl<Dictionary<String, ?>> {
 
 			AtomicReference<OSGiResult>
 				terminatorAtomicReference = new AtomicReference<>(
-					new OSGiResultImpl(NOOP, __ -> {}));
+					new OSGiResultImpl(NOOP, __ -> false));
 
 			AtomicBoolean closed = new AtomicBoolean();
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
index fe16e10..850533d 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java
@@ -150,13 +150,11 @@ public class ConfigurationsOSGiImpl extends OSGiImpl<Dictionary<String, ?>> {
 						}
 					}
 				},
-				us -> {
-					for (OSGiResult osgiResult : terminators.values()) {
-						if (osgiResult != null) {
-							osgiResult.run();
-						}
-					}
-				});
+				us -> terminators.values().stream().map(
+					osgiResult -> osgiResult.update(us)
+				).reduce(
+					Boolean.FALSE, Boolean::logicalOr
+				));
 		});
 	}
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
index 5f21e76..35e9e55 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java
@@ -61,7 +61,11 @@ public class DistributeOSGiImpl<T, S> extends BaseOSGiImpl<S> {
 
                     return new OSGiResultImpl(
                         () -> cleanUp(terminators),
-                        us -> terminators.forEach(os -> os.update(us))
+                        us -> terminators.stream().map(
+                            os -> os.update(us)
+                        ).reduce(
+                            Boolean.FALSE, Boolean::logicalOr
+                        )
                     );
                 }));
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
index 932f29c..747a883 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/EffectsOSGi.java
@@ -68,7 +68,7 @@ public class EffectsOSGi extends OSGiImpl<Void> {
                             }
                         }
 
-                        terminator.update(us);
+                        return terminator.update(us);
                     }
                 );
 
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
index e9dc9fb..31d01e3 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java
@@ -97,7 +97,7 @@ public class HighestRankingOSGi<T> extends OSGiImpl<T> {
                             synchronized (set) {
                                 Tuple<T> current = set.peek();
 
-                                current.osgiResult.update(us);
+                                return current.osgiResult.update(us);
                             }
                         }
                     );
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
index 8b4dae7..cd516d6 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/JustOSGiImpl.java
@@ -19,6 +19,8 @@
 package org.apache.aries.component.dsl.internal;
 
 
+import org.apache.aries.component.dsl.OSGiResult;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,7 +40,7 @@ public class JustOSGiImpl<T> extends OSGiImpl<T> {
 		super((executionContext, op) -> {
 
 			Collection<T> collection = supplier.get();
-			ArrayList<Runnable> references = new ArrayList<>(collection.size());
+			ArrayList<OSGiResult> references = new ArrayList<>(collection.size());
 
 			try {
 				for (T t : collection) {
@@ -53,13 +55,13 @@ public class JustOSGiImpl<T> extends OSGiImpl<T> {
 
 			return new OSGiResultImpl(
 				() -> cleanUp(references),
-				us -> {}
+				us -> references.stream().map(res -> res.update(us)).reduce(Boolean.FALSE, Boolean::logicalOr)
 			);
 		});
 	}
 
-	private static void cleanUp(ArrayList<Runnable> references) {
-		ListIterator<Runnable> iterator =
+	private static void cleanUp(ArrayList<OSGiResult> references) {
+		ListIterator<OSGiResult> iterator =
 			references.listIterator(references.size());
 
 		while (iterator.hasPrevious()) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
index 5c2dfd3..ff8b0e6 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/NothingOSGiImpl.java
@@ -25,6 +25,6 @@ import org.apache.aries.component.dsl.OSGi;
 public class NothingOSGiImpl<S> extends OSGiImpl<S> {
 
 	public NothingOSGiImpl() {
-		super((executionContext, __) -> new OSGiResultImpl(OSGi.NOOP,  ___ -> {}));
+		super((executionContext, __) -> new OSGiResultImpl(OSGi.NOOP,  ___ -> false));
 	}
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
index bce94c3..36a0745 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OSGiResultImpl.java
@@ -21,14 +21,14 @@ import org.apache.aries.component.dsl.OSGiResult;
 import org.apache.aries.component.dsl.update.UpdateSelector;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class OSGiResultImpl implements OSGiResult {
 
-	public OSGiResultImpl(Runnable close, Consumer<UpdateSelector> onUpdate) {
+	public OSGiResultImpl(Runnable close, Predicate<UpdateSelector> onUpdate) {
 		this.close = close;
 		this.onUpdate = onUpdate;
 	}
@@ -41,17 +41,16 @@ public class OSGiResultImpl implements OSGiResult {
 	}
 
 	@Override
-	public void update(UpdateSelector updateSelector) {
+	public boolean update(UpdateSelector updateSelector) {
 		if (_closed.get()) {
-			return;
+			return false;
 		}
 
-		onUpdate.accept(updateSelector);
+		return onUpdate.test(updateSelector);
 	}
 
 	private final Runnable close;
-	private Consumer<UpdateSelector> onUpdate;
-	private Runnable update;
+	private Predicate<UpdateSelector> onUpdate;
 	private AtomicBoolean _closed = new AtomicBoolean();
 
 }
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
index fd6f305..2da4baa 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/Pad.java
@@ -56,8 +56,8 @@ public class Pad<T, S> implements Publisher<T>, OSGiResult {
     }
 
     @Override
-    public void update(UpdateSelector updateSelector) {
-        _result.update(updateSelector);
+    public boolean update(UpdateSelector updateSelector) {
+        return _result.update(updateSelector);
     }
 
     @Override
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshWhenOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshWhenOSGi.java
new file mode 100644
index 0000000..cab0a0b
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshWhenOSGi.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.internal;
+
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.Publisher;
+import org.apache.aries.component.dsl.Refresher;
+import org.apache.aries.component.dsl.update.UpdateSelector;
+import org.apache.aries.component.dsl.update.UpdateTuple;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class RefreshWhenOSGi<T> extends OSGiImpl<T> {
+
+    public RefreshWhenOSGi(OSGi<T> program, BiPredicate<UpdateSelector, T> refresher) {
+        super((executionContext, op) -> program.run(
+            executionContext,
+            op.pipe(
+                t -> {
+                    OSGiResult osgiResult = op.publish(t);
+
+                    return new OSGiResultImpl(
+                        osgiResult::close,
+                        us -> {
+                            if (refresher.test(us, t)) {
+                                return true;
+                            }
+
+                            return osgiResult.update(us);
+                        }
+                    );
+                }
+            )));
+    }
+
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
index 03698aa..cac5c73 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ServiceReferenceOSGi.java
@@ -18,7 +18,6 @@
 package org.apache.aries.component.dsl.internal;
 
 import org.apache.aries.component.dsl.OSGiResult;
-import org.apache.aries.component.dsl.Refresher;
 import org.apache.aries.component.dsl.CachingServiceReference;
 import org.apache.aries.component.dsl.Publisher;
 import org.apache.aries.component.dsl.update.UpdateSelector;
@@ -33,29 +32,24 @@ import org.osgi.util.tracker.ServiceTrackerCustomizer;
 public class ServiceReferenceOSGi<T>
 	extends OSGiImpl<UpdateTuple<CachingServiceReference<T>>> implements UpdateSelector {
 
-	public ServiceReferenceOSGi(
-		String filterString, Class<T> clazz) {
-
-		this(filterString, clazz, CachingServiceReference::isDirty);
-	}
-
-	public ServiceReferenceOSGi(
-		String filterString, Class<T> clazz,
-		Refresher<? super CachingServiceReference<T>> refresher) {
+	public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
 
 		super((executionContext, op) -> {
 			ServiceTracker<T, Tracked<T>>
 				serviceTracker = new ServiceTracker<>(
 					executionContext.getBundleContext(),
 					buildFilter(executionContext, filterString, clazz),
-					new DefaultServiceTrackerCustomizer<>(op, refresher));
+					new DefaultServiceTrackerCustomizer<>(op));
 
 			serviceTracker.open();
 
 			return new OSGiResultImpl(
 				serviceTracker::close,
-				us -> serviceTracker.getTracked().forEach(
-					(__, tracked) -> tracked.runnable.update(us))
+				us -> serviceTracker.getTracked().values().stream().map(
+					tracked -> tracked.runnable.update(us)
+				).reduce(
+					Boolean.FALSE, Boolean::logicalOr
+				)
 			);
 		});
 	}
@@ -64,11 +58,9 @@ public class ServiceReferenceOSGi<T>
 		implements ServiceTrackerCustomizer<T, Tracked<T>>, UpdateSelector{
 
 		public DefaultServiceTrackerCustomizer(
-			Publisher<? super UpdateTuple<CachingServiceReference<T>>> addedSource,
-			Refresher<? super CachingServiceReference<T>> refresher) {
+			Publisher<? super UpdateTuple<CachingServiceReference<T>>> addedSource) {
 
 			_addedSource = addedSource;
-			_refresher = refresher;
 		}
 
 		@Override
@@ -85,18 +77,16 @@ public class ServiceReferenceOSGi<T>
 		public void modifiedService(
 			ServiceReference<T> reference, Tracked<T> tracked) {
 
-			if (_refresher.test(tracked.cachingServiceReference)) {
+			if (tracked.runnable.update(this)) {
 				UpdateSupport.runUpdate(() -> {
 					tracked.runnable.run();
 					tracked.cachingServiceReference = new CachingServiceReference<>(
 						reference);
 					tracked.runnable =
-						_addedSource.apply(new UpdateTuple<>(this, tracked.cachingServiceReference));
+						_addedSource.apply(
+							new UpdateTuple<>(this, tracked.cachingServiceReference));
 				});
 			}
-			else {
-				tracked.runnable.update(this);
-			}
 		}
 
 		@Override
@@ -107,7 +97,6 @@ public class ServiceReferenceOSGi<T>
 		}
 
 		private final Publisher<? super UpdateTuple<CachingServiceReference<T>>> _addedSource;
-		private Refresher<? super CachingServiceReference<T>> _refresher;
 
 	}