You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2021/03/08 17:28:08 UTC

[aries-component-dsl] 03/10: Move to own classes and unify implementation

This is an automated email from the ASF dual-hosted git repository.

csierra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-component-dsl.git

commit 9097f3c336f6761cc02c5c47f8b321a410a79e18
Author: Carlos Sierra Andrés <ca...@liferay.com>
AuthorDate: Mon Mar 8 14:19:24 2021 +0100

    Move to own classes and unify implementation
---
 .../java/org/apache/aries/component/dsl/OSGi.java  | 77 +--------------------
 .../component/dsl/internal/OnceTransformer.java    | 55 +++++++++++++++
 .../dsl/internal/RefreshAsUpdatesTransformer.java  | 78 ++++++++++++++++++++++
 3 files changed, 135 insertions(+), 75 deletions(-)

diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
index de175db..b32ae17 100644
--- a/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/OSGi.java
@@ -53,8 +53,6 @@ import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 
 import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.*;
 
 /**
@@ -376,29 +374,7 @@ public interface OSGi<T> extends OSGiRunnable<T> {
 	}
 
 	static <T> OSGi<T> once(OSGi<T> program) {
-		return program.transform(op -> {
-			AtomicInteger count = new AtomicInteger();
-
-			AtomicReference<Runnable> terminator = new AtomicReference<>();
-
-			return t -> {
-				if (count.getAndIncrement() == 0) {
-					UpdateSupport.deferPublication(
-						() -> terminator.set(op.apply(t)));
-				}
-
-				return () -> {
-					if (count.decrementAndGet() == 0) {
-						UpdateSupport.deferTermination(() -> {
-							Runnable runnable = terminator.getAndSet(NOOP);
-
-							runnable.run();
-						});
-					}
-
-				};
-			};
-		});
+		return program.transform(new OnceTransformer<>());
 	}
 
 	static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
@@ -455,56 +431,7 @@ public interface OSGi<T> extends OSGiRunnable<T> {
 	}
 
 	static <T> OSGi<T> refreshAsUpdates(OSGi<T> program) {
-		return program.transform(op -> {
-			class ResultState {
-				boolean gone;
-				OSGiResult result;
-
-				public ResultState(boolean gone, OSGiResult result) {
-					this.gone = gone;
-					this.result = result;
-				}
-			}
-
-			ThreadLocal<ResultState> threadLocal = ThreadLocal.withInitial(() -> null);
-
-			return t -> {
-				AtomicReference<OSGiResult> atomicReference = new AtomicReference<>(NOOP);
-
-				if (!UpdateSupport.isUpdate()) {
-					atomicReference.set(op.publish(t));
-				}
-				else {
-					threadLocal.get().gone = false;
-				}
-
-				return new OSGiResultImpl(
-					() -> {
-						if (!UpdateSupport.isUpdate()) {
-							atomicReference.getAndSet(NOOP).run();
-						}
-						else {
-							threadLocal.set(new ResultState(true, atomicReference.get()));
-
-							UpdateSupport.deferTermination(
-								() -> {
-									if (threadLocal.get().gone) {
-										threadLocal.get().result.run();
-
-										threadLocal.remove();
-										atomicReference.set(NOOP);
-									}
-									else {
-										threadLocal.get().result.update();
-									}
-								}
-							);
-						}
-					},
-					() -> atomicReference.get().update()
-				);
-			};
-		});
+		return program.transform(new RefreshAsUpdatesTransformer<>());
 	}
 
 	static <T> OSGi<T> refreshWhen(OSGi<T> program, Predicate<T> refresher) {
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnceTransformer.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnceTransformer.java
new file mode 100644
index 0000000..8feaae9
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnceTransformer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.internal;
+
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.Publisher;
+import org.apache.aries.component.dsl.Transformer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class OnceTransformer<T> implements Transformer<T, T> {
+
+    @Override
+    public Publisher<T> transform(Publisher<? super T> op) {
+        AtomicInteger count = new AtomicInteger();
+
+        AtomicReference<OSGiResult> terminator = new AtomicReference<>();
+
+        return t -> {
+            if (count.getAndIncrement() == 0) {
+                UpdateSupport.deferPublication(
+                    () -> terminator.set(op.apply(t)));
+            }
+
+            return new OSGiResultImpl(() -> {
+                if (count.decrementAndGet() == 0) {
+                    UpdateSupport.deferTermination(() -> {
+                        Runnable runnable = terminator.getAndSet(OSGi.NOOP);
+
+                        runnable.run();
+                    });
+                }},
+                () -> terminator.get().update()
+            );
+        };
+    }
+
+}
diff --git a/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshAsUpdatesTransformer.java b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshAsUpdatesTransformer.java
new file mode 100644
index 0000000..7f711bd
--- /dev/null
+++ b/component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshAsUpdatesTransformer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.component.dsl.internal;
+
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.component.dsl.Publisher;
+import org.apache.aries.component.dsl.Transformer;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RefreshAsUpdatesTransformer<T> implements Transformer<T, T> {
+
+    @Override
+    public Publisher<T> transform(Publisher<? super T> op) {
+        ThreadLocal<ResultState> threadLocal = ThreadLocal.withInitial(() -> null);
+
+        return t -> {
+            AtomicReference<OSGiResult> atomicReference = new AtomicReference<>(OSGi.NOOP);
+
+            if (!UpdateSupport.isUpdate()) {
+                atomicReference.set(op.publish(t));
+            } else {
+                threadLocal.get().gone = false;
+            }
+
+            return new OSGiResultImpl(
+                () -> {
+                    if (!UpdateSupport.isUpdate()) {
+                        atomicReference.getAndSet(OSGi.NOOP).run();
+                    } else {
+                        threadLocal.set(new ResultState(true, atomicReference.get()));
+
+                        UpdateSupport.deferTermination(
+                            () -> {
+                                if (threadLocal.get().gone) {
+                                    threadLocal.get().result.run();
+
+                                    threadLocal.remove();
+                                    atomicReference.set(OSGi.NOOP);
+                                } else {
+                                    threadLocal.get().result.update();
+                                }
+                            }
+                        );
+                    }
+                },
+                () -> atomicReference.get().update()
+            );
+        };
+    }
+
+    private static class ResultState {
+        boolean gone;
+        OSGiResult result;
+
+        public ResultState(boolean gone, OSGiResult result) {
+            this.gone = gone;
+            this.result = result;
+        }
+    }
+
+}