You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/08/28 18:37:38 UTC
svn commit: r1806482 -
/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/
Author: csierra
Date: Mon Aug 28 18:37:37 2017
New Revision: 1806482
URL: http://svn.apache.org/viewvc?rev=1806482&view=rev
Log:
Abstract flatMap and implement it thread safe
Added:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java Mon Aug 28 18:37:37 2017
@@ -18,6 +18,7 @@
package org.apache.aries.osgi.functional.internal;
import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleEvent;
import org.osgi.util.tracker.BundleTracker;
@@ -81,11 +82,14 @@ public class BundleOSGi extends OSGiImpl
return new OSGiResultImpl<>(
added, removed, bundleTracker::open, bundleTracker::close);
});
+
_stateMask = stateMask;
}
@Override
- public <S> OSGiImpl<S> flatMap(Function<? super Bundle, OSGi<? extends S>> fun) {
+ public <S> OSGiImpl<S> flatMap(
+ Function<? super Bundle, OSGi<? extends S>> fun) {
+
return new OSGiImpl<>(bundleContext -> {
Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
@@ -95,13 +99,13 @@ public class BundleOSGi extends OSGiImpl
Consumer<Tuple<S>> removedSource = removed.getSource();
- BundleTracker<Tracked<Bundle, S>> bundleTracker =
+ BundleTracker<OSGiResult<S>> bundleTracker =
new BundleTracker<>(
bundleContext, _stateMask,
- new BundleTrackerCustomizer<Tracked<Bundle, S>>() {
+ new BundleTrackerCustomizer<OSGiResult<S>>() {
@Override
- public Tracked<Bundle, S> addingBundle(
+ public OSGiResult<S> addingBundle(
Bundle bundle, BundleEvent bundleEvent) {
OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(
@@ -110,30 +114,17 @@ public class BundleOSGi extends OSGiImpl
OSGiResultImpl<S> result =
program._operation.run(bundleContext);
- Tracked<Bundle, S> tracked = new Tracked<>();
-
- tracked.service = bundle;
- tracked.program = result;
-
- result.added.map(s -> {
- tracked.result = s;
+ result.pipeTo(addedSource, removedSource);
- addedSource.accept(s);
-
- return s;
- });
-
- result.start.run();
-
- return tracked;
+ return result;
}
@Override
public void modifiedBundle(
Bundle bundle, BundleEvent bundleEvent,
- Tracked<Bundle, S> tracked) {
+ OSGiResult<S> result) {
- removedBundle(bundle, bundleEvent, tracked);
+ removedBundle(bundle, bundleEvent, result);
addingBundle(bundle, bundleEvent);
}
@@ -141,13 +132,9 @@ public class BundleOSGi extends OSGiImpl
@Override
public void removedBundle(
Bundle bundle, BundleEvent bundleEvent,
- Tracked<Bundle, S> tracked) {
-
- tracked.program.close();
+ OSGiResult<S> result) {
- if (tracked.result != null) {
- removedSource.accept(tracked.result);
- }
+ result.close();
}
});
Added: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java?rev=1806482&view=auto
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java (added)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java Mon Aug 28 18:37:37 2017
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class FlatMapImpl<T, S> extends OSGiImpl<S> {
+
+ public FlatMapImpl(
+ OSGiImpl<T> previous, Function<? super T, OSGi<? extends S>> fun) {
+
+ super((bundleContext) -> {
+ Map<IdentityKey<Object>, OSGiResult<?>> identities =
+ new ConcurrentHashMap<>();
+
+ AtomicReference<Runnable> closeReference =
+ new AtomicReference<>(NOOP);
+
+ Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+ Consumer<Tuple<S>> addedSource = added.getSource();
+
+ Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+ Consumer<Tuple<S>> removedSource = removed.getSource();
+
+ return new OSGiResultImpl<>(
+ added, removed,
+ () -> {
+ OSGiResultImpl<T> or1 = previous._operation.run(
+ bundleContext);
+
+ closeReference.set(or1.close);
+
+ or1.added.map(t -> {
+ OSGiImpl<S> program = (OSGiImpl<S>)fun.apply(t.t);
+
+ OSGiResultImpl<S> or2 =
+ program._operation.run(bundleContext);
+
+ or2.pipeTo(addedSource, removedSource);
+
+ identities.put(new IdentityKey<>(t.original), or2);
+
+ return null;
+ });
+
+ or1.removed.map(t -> {
+ OSGiResult<?> osgiResult1 = identities.remove(
+ new IdentityKey<>(t.original));
+
+ if (osgiResult1 != null) {
+ osgiResult1.close();
+ }
+
+ return null;
+ });
+
+ or1.start.run();
+ },
+ () -> {
+ identities.values().forEach(OSGiResult::close);
+
+ closeReference.get().run();
+ });
+ }
+ );
+ }
+
+ private static class IdentityKey<T> {
+
+ private final T _instance;
+
+ public IdentityKey(T instance) {
+ _instance = instance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ IdentityKey<?> that = (IdentityKey<?>) o;
+
+ return _instance == that._instance;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(_instance);
+ }
+
+ }
+
+}
+
+
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Mon Aug 28 18:37:37 2017
@@ -25,7 +25,6 @@ import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
@@ -35,7 +34,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
/**
* @author Carlos Sierra Andrés
@@ -50,70 +48,7 @@ public class OSGiImpl<T> implements OSGi
@Override
public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
- return new OSGiImpl<>(
- ((bundleContext) -> {
- Map<Object, OSGiResult<?>> identities =
- new IdentityHashMap<>();
-
- AtomicReference<Runnable> closeReference =
- new AtomicReference<>(NOOP);
-
- Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
-
- Consumer<Tuple<S>> addedSource = added.getSource();
-
- Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
-
- Consumer<Tuple<S>> removedSource = removed.getSource();
-
- return new OSGiResultImpl<>(
- added, removed,
- () -> {
- OSGiResultImpl<T> or1 = _operation.run(bundleContext);
-
- closeReference.set(or1.close);
-
- or1.added.map(t -> {
- OSGiImpl<S> program =
- (OSGiImpl<S>)fun.apply(t.t);
-
- OSGiResultImpl<S> or2 =
- program._operation.run(bundleContext);
-
- or2.added.map(s -> {addedSource.accept(s); return Tuple.create(null);});
- or2.removed.map(s -> {removedSource.accept(s); return Tuple.create(null);});
-
- or2.start.run();
-
- identities.put(t.original, or2);
-
- return Tuple.create(null);
- });
-
- or1.removed.map(t -> {
- synchronized (identities) {
- OSGiResult<?> osgiResult1 =
- identities.remove(t.original);
-
- if (osgiResult1 != null) {
- osgiResult1.close();
- }
- }
-
- return Tuple.create(null);
- });
-
- or1.start.run();
- },
- () -> {
- synchronized (identities) {
- identities.values().forEach(OSGiResult::close);
- }
-
- closeReference.get().run();
- });
- }
- ));
+ return new FlatMapImpl<>(this, fun);
}
@Override
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Mon Aug 28 18:37:37 2017
@@ -19,6 +19,8 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGiResult;
+import java.util.function.Consumer;
+
/**
* @author Carlos Sierra Andrés
*/
@@ -44,4 +46,14 @@ public class OSGiResultImpl<T> implement
close.run();
}
+ public void pipeTo(
+ Consumer<Tuple<T>> addedSource, Consumer<Tuple<T>> removedSource) {
+
+ added.map(t -> {addedSource.accept(t); return null;});
+
+ removed.map(t -> {removedSource.accept(t); return null;});
+
+ start.run();
+ }
+
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java Mon Aug 28 18:37:37 2017
@@ -129,22 +129,18 @@ public class PrototypesOSGi<T>
bundleContext.getServiceObjects(
reference);
- OSGi<? extends S> program = fun.apply(serviceObjects);
+ OSGiImpl<S> program = (OSGiImpl<S>)fun.apply(
+ serviceObjects);
Tracked<ServiceObjects<T>, S> tracked =
new Tracked<>();
- OSGiResult<? extends S> result = program.run(
- bundleContext, s -> {
- Tuple<S> tuple = Tuple.create(s);
+ OSGiResultImpl<S> result = program._operation.run(
+ bundleContext);
- tracked.result = tuple;
+ result.pipeTo(addedSource, removedSource);
- addedSource.accept(tuple);
- }
- );
-
- tracked.program = result;
+ tracked.result = result;
tracked.service = serviceObjects;
return tracked;
@@ -165,11 +161,7 @@ public class PrototypesOSGi<T>
ServiceReference<T> reference,
Tracked<ServiceObjects<T>, S> tracked) {
- tracked.program.close();
-
- if (tracked.result != null) {
- removedSource.accept(tracked.result);
- }
+ tracked.result.close();
}
});
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java Mon Aug 28 18:37:37 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceObjects;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
@@ -28,8 +27,6 @@ import org.osgi.util.tracker.ServiceTrac
import java.util.function.Consumer;
import java.util.function.Function;
-import static org.apache.aries.osgi.functional.internal.OSGiImpl.buildFilter;
-
/**
* @author Carlos Sierra Andrés
*/
@@ -82,7 +79,7 @@ public class ServiceReferenceOSGi<T> ext
Consumer<Tuple<S>> removedSource = removed.getSource();
- ServiceTracker<T, Tracked<T, S>> serviceTracker =
+ ServiceTracker<T, OSGiResult<S>> serviceTracker =
new ServiceTracker<>(
bundleContext,
buildFilter(
@@ -137,7 +134,7 @@ public class ServiceReferenceOSGi<T> ext
}
private static class FlatMapServiceTrackerCustomizer<T, S>
- implements ServiceTrackerCustomizer<T, Tracked<T, S>> {
+ implements ServiceTrackerCustomizer<T, OSGiResult<S>> {
private final Function<? super ServiceReference<T>, OSGi<? extends S>>
_fun;
private final BundleContext _bundleContext;
@@ -157,27 +154,20 @@ public class ServiceReferenceOSGi<T> ext
}
@Override
- public Tracked<T, S> addingService(ServiceReference<T> reference) {
- OSGi<? extends S> program = _fun.apply(reference);
-
- Tracked<T, S> tracked = new Tracked<>();
+ public OSGiResult<S> addingService(ServiceReference<T> reference) {
+ OSGiImpl<S> program = (OSGiImpl<S>) _fun.apply(reference);
- tracked.program = program.run(
- _bundleContext, s -> {
- Tuple<S> tuple = Tuple.create(s);
+ OSGiResultImpl<S> osgiResult = program._operation.run(
+ _bundleContext);
- tracked.result = tuple;
+ osgiResult.pipeTo(_addedSource, _removedSource);
- _addedSource.accept(tuple);
- }
- );
-
- return tracked;
+ return osgiResult;
}
@Override
public void modifiedService(
- ServiceReference<T> reference, Tracked<T, S> tracked) {
+ ServiceReference<T> reference, OSGiResult<S> tracked) {
removedService(reference, tracked);
@@ -186,13 +176,9 @@ public class ServiceReferenceOSGi<T> ext
@Override
public void removedService(
- ServiceReference<T> reference, Tracked<T, S> tracked) {
-
- tracked.program.close();
+ ServiceReference<T> reference, OSGiResult<S> tracked) {
- if (tracked.result != null) {
- _removedSource.accept(tracked.result);
- }
+ tracked.close();
}
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java Mon Aug 28 18:37:37 2017
@@ -128,22 +128,18 @@ public class ServicesOSGi<T> extends OSG
T service = serviceObjects.getService();
- OSGi<? extends S> program = fun.apply(service);
+ OSGiImpl<S> program =
+ (OSGiImpl<S>)fun.apply(service);
- Tracked<T, S> tracked = new Tracked<>();
+ OSGiResultImpl<S> result = program._operation.run(
+ bundleContext);
+
+ result.pipeTo(addedSource, removedSource);
- OSGiResult<? extends S> result = program.run(
- bundleContext, s -> {
- Tuple<S> tuple = Tuple.create(s);
-
- tracked.result = tuple;
-
- addedSource.accept(tuple);
- }
- );
+ Tracked<T, S> tracked = new Tracked<>();
+ tracked.result = result;
tracked.service = service;
- tracked.program = result;
return tracked;
}
@@ -163,18 +159,12 @@ public class ServicesOSGi<T> extends OSG
ServiceReference<T> reference,
Tracked<T, S> tracked) {
- tracked.program.close();
-
- if (tracked.result != null) {
- removedSource.accept(tracked.result);
- }
+ tracked.result.close();
ServiceObjects<T> serviceObjects =
- bundleContext.getServiceObjects(
- reference);
+ bundleContext.getServiceObjects(reference);
- serviceObjects.ungetService(
- tracked.service);
+ serviceObjects.ungetService(tracked.service);
}
});
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java Mon Aug 28 18:37:37 2017
@@ -19,14 +19,14 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGiResult;
+
/**
* @author Carlos Sierra Andrés
*/
class Tracked<T, S> {
T service = null;
- OSGiResult<? extends S> program = null;
- Tuple<S> result = null;
+ OSGiResult<? extends S> result = null;
}