You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2019/10/09 19:58:27 UTC
[isis] branch v2 updated: ISIS-2158: extend spec-loading,
also allow for concurrent validation
This is an automated email from the ASF dual-hosted git repository.
ahuber pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git
The following commit(s) were added to refs/heads/v2 by this push:
new 04f0485 ISIS-2158: extend spec-loading, also allow for concurrent validation
04f0485 is described below
commit 04f04859c68a5f7f8db1bd01ba7341d6a3ff4222
Author: Andi Huber <ah...@apache.org>
AuthorDate: Wed Oct 9 21:58:19 2019 +0200
ISIS-2158: extend spec-loading, also allow for concurrent validation
- as an optimization step we also introduced a new highly specialized
List type '_VersionedList'
- _VersionedList allows to iterate over its elements while concurrently
adding elements to the list
- we have a special traversal function 'forEach(element)' that very fast
traverses all elements even those that get added during traversal
---
.../collections/snapshot/_VersionedList.java | 217 +++++++++++++++++++++
.../collections/snapshot/VersionedListTest.java | 69 +++++++
.../specloader/SpecificationCacheDefault.java | 21 +-
.../metamodel/specloader/SpecificationLoader.java | 11 ++
.../specloader/SpecificationLoaderDefault.java | 13 ++
.../validator/MetaModelValidatorVisiting.java | 50 ++++-
6 files changed, 370 insertions(+), 11 deletions(-)
diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/collections/snapshot/_VersionedList.java b/core/commons/src/main/java/org/apache/isis/commons/internal/collections/snapshot/_VersionedList.java
new file mode 100644
index 0000000..becad61
--- /dev/null
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/collections/snapshot/_VersionedList.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.isis.commons.internal.collections.snapshot;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.isis.commons.internal.base._With.requires;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.val;
+
+/**
+ * Thread-safe pseudo list, that increments its version each time a snapshot is requested.
+ * <p>
+ * This allows to easily keep track of any additions to the list that occurred in between
+ * snapshots.
+ *
+ * @since 2.0
+ * @param <T>
+ */
+public final class _VersionedList<T> {
+
+ private UUID uuid = UUID.randomUUID();
+ private final List<List<T>> versions = new ArrayList<>();
+ private List<T> currentVersion = new ArrayList<>();
+ private int size;
+
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ public final static class Snapshot<T> {
+ private UUID ownerUuid;
+ @SuppressWarnings("unused")
+ private final int fromIndex; //low endpoint (inclusive) of the copy
+ private final int toIndex; // high endpoint (exclusive) of the copy
+ private final List<List<T>> versions;
+
+ public boolean isEmpty() {
+ return versions.isEmpty();
+ }
+
+ public Stream<T> stream() {
+ return versions.stream()
+ .flatMap(List::stream);
+ }
+
+ public void forEach(Consumer<T> action) {
+ for(val ver : versions) {
+ for(val element : ver) {
+ action.accept(element);
+ }
+ }
+ }
+
+ public void forEachParallel(Consumer<T> action) {
+ for(val ver : versions) {
+ if(ver.size()>8) {
+ ver.parallelStream().forEach(action);
+ } else {
+ for(val element : ver) {
+ action.accept(element);
+ }
+ }
+ }
+ }
+
+
+ }
+
+ public Snapshot<T> snapshot() {
+ synchronized(versions) {
+ commit();
+ return new Snapshot<>(uuid, 0, versions.size(), defensiveCopy());
+ }
+ }
+
+ public Snapshot<T> deltaSince(Snapshot<T> snapshot) {
+
+ requires(snapshot, "snapshot");
+
+ if(snapshot.ownerUuid!=uuid) {
+ throw new IllegalArgumentException("Snapshot's UUID is different from the VersionedList's.");
+ }
+
+ synchronized(versions) {
+ commit();
+ val from = snapshot.toIndex;
+ val to = versions.size();
+ return new Snapshot<>(uuid, from, to, defensiveCopy(from, to));
+ }
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean isEmpty() {
+ return size==0;
+ }
+
+ /** current implementation cannot handle concurrent additions that occur during traversal*/
+ public Stream<T> stream() {
+ final List<List<T>> defensiveCopy;
+ synchronized(versions) {
+ commit();
+ defensiveCopy = defensiveCopy();
+ }
+ return defensiveCopy.stream()
+ .flatMap(List::stream);
+ }
+
+ public boolean add(T e) {
+ synchronized(versions) {
+ ++size;
+ return currentVersion.add(e);
+ }
+ }
+
+ public boolean addAll(Collection<? extends T> c) {
+ synchronized(versions) {
+ size+=c.size();
+ return currentVersion.addAll(c);
+ }
+ }
+
+ public void clear() {
+ synchronized(versions) {
+ uuid = UUID.randomUUID();
+ size=0;
+ versions.clear();
+ currentVersion.clear();
+ }
+ }
+
+ /**
+ * Also handles concurrent additions that occur during traversal.
+ * @param action
+ */
+ public void forEach(Consumer<T> action) {
+ val snapshot = snapshot();
+ snapshot.forEach(action);
+ Snapshot<T> delta = deltaSince(snapshot);
+ while(!delta.isEmpty()) {
+ delta.forEach(action);
+ delta = deltaSince(delta);
+ }
+ }
+
+ /**
+ * Also handles concurrent additions that occur during traversal.
+ * @param action
+ */
+ public void forEachParallel(Consumer<T> action) {
+ val snapshot = snapshot();
+ snapshot.forEachParallel(action);
+ Snapshot<T> delta = deltaSince(snapshot);
+ while(!delta.isEmpty()) {
+ delta.forEachParallel(action);
+ delta = deltaSince(delta);
+ }
+ }
+
+
+ // -- HELPER
+
+
+ /**
+ * @implNote only call within synchronized block!
+ * @param fromIndex low endpoint (inclusive) of the copy
+ * @param toIndex high endpoint (exclusive) of the copy
+ */
+ private List<List<T>> defensiveCopy(int fromIndex, int toIndex) {
+ if(fromIndex==toIndex) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(versions.subList(fromIndex, toIndex));
+ }
+
+ /** @implNote only call within synchronized block! */
+ private List<List<T>> defensiveCopy() {
+ if(versions.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(versions);
+ }
+
+ /** @implNote only call within synchronized block! */
+ private void commit() {
+ if(!currentVersion.isEmpty()) {
+ versions.add(currentVersion);
+ currentVersion = new ArrayList<>(); // create a new array for others to write to next
+ }
+ }
+
+
+}
diff --git a/core/commons/src/test/java/org/apache/isis/commons/internal/collections/snapshot/VersionedListTest.java b/core/commons/src/test/java/org/apache/isis/commons/internal/collections/snapshot/VersionedListTest.java
new file mode 100644
index 0000000..d659f22
--- /dev/null
+++ b/core/commons/src/test/java/org/apache/isis/commons/internal/collections/snapshot/VersionedListTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.isis.commons.internal.collections.snapshot;
+
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import lombok.val;
+
+class VersionedListTest {
+
+ @Test
+ void test() {
+
+ val vList = new _VersionedList<String>();
+
+ assertEquals(0, vList.size());
+ assertTrue(vList.isEmpty());
+
+ vList.add("foo");
+
+ assertEquals(1, vList.size());
+ assertFalse(vList.isEmpty());
+
+ val snapshot1 = vList.snapshot();
+ assertEquals(1, snapshot1.stream().count());
+
+ vList.add("bar");
+
+ val snapshot2 = vList.snapshot();
+ assertEquals(2, snapshot2.stream().count());
+ assertEquals("foo,bar", snapshot2.stream().collect(Collectors.joining(",")));
+
+ val delta = vList.deltaSince(snapshot1);
+ assertEquals(1, delta.stream().count());
+ assertEquals("bar", delta.stream().collect(Collectors.joining(",")));
+
+ vList.add("gru");
+
+ val snapshot3 = vList.snapshot();
+ assertEquals("foo,bar,gru", snapshot3.stream().collect(Collectors.joining(",")));
+
+ val snapshot4 = vList.deltaSince(snapshot3);
+ assertTrue(snapshot4.isEmpty());
+
+ }
+
+}
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java
index 66aa7c9..47720e3 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java
@@ -25,10 +25,13 @@ import java.util.Map;
import java.util.function.Function;
import org.apache.isis.commons.internal.collections._Maps;
+import org.apache.isis.commons.internal.collections.snapshot._VersionedList;
import org.apache.isis.metamodel.facets.object.objectspecid.ObjectSpecIdFacet;
import org.apache.isis.metamodel.spec.ObjectSpecId;
import org.apache.isis.metamodel.spec.ObjectSpecification;
+import lombok.AccessLevel;
+import lombok.Getter;
import lombok.val;
@@ -37,6 +40,10 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
private final Map<String, T> specByClassName = _Maps.newHashMap();
private final Map<ObjectSpecId, String> classNameBySpecId = _Maps.newHashMap();
+ // optimization: specialized list to keep track of any additions to the cache fast
+ @Getter(value = AccessLevel.PACKAGE)
+ private final _VersionedList<T> vList = new _VersionedList<>();
+
public T get(String className) {
return specByClassName.get(className);
}
@@ -54,8 +61,11 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
}
public void clear() {
- specByClassName.clear();
- classNameBySpecId.clear();
+ synchronized(this) {
+ specByClassName.clear();
+ classNameBySpecId.clear();
+ vList.clear();
+ }
}
/** @returns thread-safe defensive copy */
@@ -64,7 +74,7 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
return new ArrayList<T>(specByClassName.values());
}
}
-
+
public T getByObjectType(final ObjectSpecId objectSpecID) {
val className = classNameBySpecId.get(objectSpecID);
return className != null ? specByClassName.get(className) : null;
@@ -76,7 +86,10 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
}
val className = spec.getCorrespondingClass().getName();
val specId = spec.getSpecId();
- specByClassName.put(className, spec);
+ val existing = specByClassName.put(className, spec);
+ if(existing==null) {
+ vList.add(spec); // add to vList only if we don't have it already
+ }
if (specId == null) {
return;
}
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java
index 152f5a2..dd871e1 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java
@@ -19,6 +19,7 @@
package org.apache.isis.metamodel.specloader;
import java.util.Collection;
+import java.util.function.Consumer;
import javax.annotation.Nullable;
@@ -71,6 +72,14 @@ public interface SpecificationLoader {
* @return snapshot of all the (currently) loaded specifications, a defensive-copy
*/
Collection<ObjectSpecification> snapshotSpecifications();
+
+ /**
+ * Similar to {@link #snapshotSpecifications()}, but also handles concurrent additions that occur
+ * during traversal.
+ *
+ * @param action
+ */
+ void forEach(Consumer<ObjectSpecification> onSpec);
/**
* Lookup a specification that has bean loaded before.
@@ -118,4 +127,6 @@ public interface SpecificationLoader {
+
+
}
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
index 6dcf67a..c260fe5 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
@@ -20,6 +20,7 @@ package org.apache.isis.metamodel.specloader;
import java.util.Collection;
import java.util.List;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -287,6 +288,18 @@ public class SpecificationLoaderDefault implements SpecificationLoader {
public Collection<ObjectSpecification> snapshotSpecifications() {
return cache.snapshotSpecs();
}
+
+ @Override
+ public void forEach(Consumer<ObjectSpecification> onSpec) {
+ val shouldRunConcurrent = isisConfiguration.getReflector().getIntrospector().isParallelize();
+ val vList = cache.getVList(); // vList is thread-safe
+ if(shouldRunConcurrent) {
+ vList.forEachParallel(onSpec);
+ } else {
+ vList.forEach(onSpec);
+ }
+
+ }
@Override
public ObjectSpecification lookupBySpecIdElseLoad(ObjectSpecId objectSpecId) {
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java
index 84b2aa2..a6b568b 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java
@@ -21,10 +21,13 @@ package org.apache.isis.metamodel.specloader.validator;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.isis.commons.internal.collections._Lists;
+import org.apache.isis.commons.internal.debug._Probe;
import org.apache.isis.metamodel.MetaModelContext;
import org.apache.isis.metamodel.spec.ObjectSpecification;
+import org.apache.isis.metamodel.specloader.SpecificationLoaderDefault;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
@@ -50,17 +53,38 @@ public class MetaModelValidatorVisiting extends MetaModelValidatorAbstract {
@Override
public void collectFailuresInto(@NonNull ValidationFailures validationFailures) {
validateAll();
+ //validateAllLegacy();
summarize();
super.collectFailuresInto(validationFailures);
}
+ private static _Probe probe = _Probe.unlimited().label("spec");
+
private void validateAll() {
- val specsValidated = _Lists.<ObjectSpecification>newArrayList();
+ val specLoader = (SpecificationLoaderDefault)MetaModelContext.current().getSpecificationLoader();
+ val ladd = new LongAdder();
+
+ specLoader.forEach(spec->{
+ ladd.increment();
+ visitor.visit(spec, this);
+ });
+
+ //probe.println("specsToValidate " + ladd.longValue());
+
+ }
+
+ //TODO[2158] cleanup legacy
+ private void validateAllLegacy() {
+ val shouldRunConcurrent = getConfiguration().getReflector().getIntrospector().isParallelize();
+ val specsValidated = _Lists.<ObjectSpecification>newArrayList();
+
while(validateSpecs(specsValidated)) {
// validate in a loop, because the act of validating might cause additional specs to be uncovered
}
+
+ probe.println("specsToValidate " + specsValidated.size());
}
@@ -76,20 +100,32 @@ public class MetaModelValidatorVisiting extends MetaModelValidatorAbstract {
// don't validate any specs already processed
specsToValidate.removeAll(specsAlreadyValidated);
+
if(specsToValidate.isEmpty()) {
// don't call us again
return false;
}
+
// validate anything new
- for (val objSpec : specsToValidate) {
- if(!visitor.visit(objSpec, this)) {
- break;
- }
- }
-
+
// add the new specs just validated to the list (for next time)
specsAlreadyValidated.addAll(specsToValidate);
+
+ val isConcurrentFromConfig = getConfiguration().getReflector().getIntrospector().isParallelize();
+ val runSequential = !isConcurrentFromConfig;
+ if(runSequential) {
+
+ for (val spec : specsToValidate) {
+ if(!visitor.visit(spec, this)) {
+ break;
+ }
+ }
+
+ } else {
+ specsToValidate.parallelStream()
+ .forEach(spec -> visitor.visit(spec, this));
+ }
// go round the loop again
return true;