You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2019/10/09 19:58:27 UTC

[isis] branch v2 updated: ISIS-2158: extend spec-loading, also allow for concurrent validation

This is an automated email from the ASF dual-hosted git repository.

ahuber pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/isis.git


The following commit(s) were added to refs/heads/v2 by this push:
     new 04f0485  ISIS-2158: extend spec-loading, also allow for concurrent validation
04f0485 is described below

commit 04f04859c68a5f7f8db1bd01ba7341d6a3ff4222
Author: Andi Huber <ah...@apache.org>
AuthorDate: Wed Oct 9 21:58:19 2019 +0200

    ISIS-2158: extend spec-loading, also allow for concurrent validation
    
    - as an optimization step we also introduced a new highly specialized
    List type '_VersionedList'
    - _VersionedList allows to iterate over its elements while concurrently
    adding elements to the list
    - we have a special traversal function 'forEach(element)' that very fast
    traverses all elements even those that get added during traversal
---
 .../collections/snapshot/_VersionedList.java       | 217 +++++++++++++++++++++
 .../collections/snapshot/VersionedListTest.java    |  69 +++++++
 .../specloader/SpecificationCacheDefault.java      |  21 +-
 .../metamodel/specloader/SpecificationLoader.java  |  11 ++
 .../specloader/SpecificationLoaderDefault.java     |  13 ++
 .../validator/MetaModelValidatorVisiting.java      |  50 ++++-
 6 files changed, 370 insertions(+), 11 deletions(-)

diff --git a/core/commons/src/main/java/org/apache/isis/commons/internal/collections/snapshot/_VersionedList.java b/core/commons/src/main/java/org/apache/isis/commons/internal/collections/snapshot/_VersionedList.java
new file mode 100644
index 0000000..becad61
--- /dev/null
+++ b/core/commons/src/main/java/org/apache/isis/commons/internal/collections/snapshot/_VersionedList.java
@@ -0,0 +1,217 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.isis.commons.internal.collections.snapshot;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.isis.commons.internal.base._With.requires;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.val;
+
+/**
+ * Thread-safe pseudo list, that increments its version each time a snapshot is requested. 
+ * <p>
+ * This allows to easily keep track of any additions to the list that occurred in between 
+ * snapshots.
+ *  
+ * @since 2.0
+ * @param <T>
+ */
+public final class _VersionedList<T> {
+
+    private UUID uuid = UUID.randomUUID();
+    private final List<List<T>> versions = new ArrayList<>();
+    private List<T> currentVersion = new ArrayList<>();
+    private int size;
+
+    @AllArgsConstructor(access = AccessLevel.PRIVATE)
+    public final static class Snapshot<T> {
+        private UUID ownerUuid;
+        @SuppressWarnings("unused")
+        private final int fromIndex; //low endpoint (inclusive) of the copy
+        private final int toIndex; // high endpoint (exclusive) of the copy
+        private final List<List<T>> versions;
+
+        public boolean isEmpty() {
+            return versions.isEmpty();
+        }
+        
+        public Stream<T> stream() {
+            return versions.stream()
+                    .flatMap(List::stream);
+        }
+        
+        public void forEach(Consumer<T> action) {
+            for(val ver : versions) {
+                for(val element : ver) {
+                    action.accept(element);
+                }
+            }
+        }
+        
+        public void forEachParallel(Consumer<T> action) {
+            for(val ver : versions) {
+                if(ver.size()>8) {
+                    ver.parallelStream().forEach(action);
+                } else {
+                    for(val element : ver) {
+                        action.accept(element);
+                    }
+                }
+            }
+        }
+        
+
+    }
+
+    public Snapshot<T> snapshot() {
+        synchronized(versions) {
+            commit();
+            return new Snapshot<>(uuid, 0, versions.size(), defensiveCopy());
+        }
+    }
+
+    public Snapshot<T> deltaSince(Snapshot<T> snapshot) {
+
+        requires(snapshot, "snapshot");
+
+        if(snapshot.ownerUuid!=uuid) {
+            throw new IllegalArgumentException("Snapshot's UUID is different from the VersionedList's.");
+        }
+
+        synchronized(versions) {
+            commit();
+            val from = snapshot.toIndex;
+            val to = versions.size();
+            return new Snapshot<>(uuid, from, to, defensiveCopy(from, to));
+        }
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public boolean isEmpty() {
+        return size==0;
+    }
+
+    /** current implementation cannot handle concurrent additions that occur during traversal*/
+    public Stream<T> stream() {
+        final List<List<T>> defensiveCopy;
+        synchronized(versions) {
+            commit();
+            defensiveCopy = defensiveCopy();
+        }
+        return defensiveCopy.stream()
+                .flatMap(List::stream);
+    }
+
+    public boolean add(T e) {
+        synchronized(versions) {
+            ++size;
+            return currentVersion.add(e);
+        }
+    }
+
+    public boolean addAll(Collection<? extends T> c) {
+        synchronized(versions) {
+            size+=c.size();
+            return currentVersion.addAll(c);
+        }
+    }
+
+    public void clear() {
+        synchronized(versions) {
+            uuid = UUID.randomUUID();
+            size=0;
+            versions.clear();
+            currentVersion.clear();
+        }
+    }
+    
+    /**
+     * Also handles concurrent additions that occur during traversal. 
+     * @param action
+     */
+    public void forEach(Consumer<T> action) {
+        val snapshot = snapshot();
+        snapshot.forEach(action);
+        Snapshot<T> delta = deltaSince(snapshot);
+        while(!delta.isEmpty()) {
+            delta.forEach(action);
+            delta = deltaSince(delta);
+        }
+    }
+
+    /**
+     * Also handles concurrent additions that occur during traversal. 
+     * @param action
+     */
+    public void forEachParallel(Consumer<T> action) {
+        val snapshot = snapshot();
+        snapshot.forEachParallel(action);
+        Snapshot<T> delta = deltaSince(snapshot);
+        while(!delta.isEmpty()) {
+            delta.forEachParallel(action);
+            delta = deltaSince(delta);
+        }
+    }
+    
+
+    // -- HELPER
+
+
+    /** 
+     * @implNote only call within synchronized block! 
+     * @param fromIndex low endpoint (inclusive) of the copy
+     * @param toIndex high endpoint (exclusive) of the copy
+     */
+    private List<List<T>> defensiveCopy(int fromIndex, int toIndex) {
+        if(fromIndex==toIndex) {
+            return Collections.emptyList();
+        }
+        return new ArrayList<>(versions.subList(fromIndex, toIndex));
+    }
+
+    /** @implNote only call within synchronized block! */
+    private List<List<T>> defensiveCopy() {
+        if(versions.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return new ArrayList<>(versions);
+    }
+
+    /** @implNote only call within synchronized block! */
+    private void commit() {
+        if(!currentVersion.isEmpty()) {
+            versions.add(currentVersion);
+            currentVersion = new ArrayList<>(); // create a new array for others to write to next    
+        }
+    }
+
+
+}
diff --git a/core/commons/src/test/java/org/apache/isis/commons/internal/collections/snapshot/VersionedListTest.java b/core/commons/src/test/java/org/apache/isis/commons/internal/collections/snapshot/VersionedListTest.java
new file mode 100644
index 0000000..d659f22
--- /dev/null
+++ b/core/commons/src/test/java/org/apache/isis/commons/internal/collections/snapshot/VersionedListTest.java
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.isis.commons.internal.collections.snapshot;
+
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import lombok.val;
+
+class VersionedListTest {
+
+    @Test
+    void test() {
+        
+        val vList = new _VersionedList<String>();
+        
+        assertEquals(0, vList.size());
+        assertTrue(vList.isEmpty());
+        
+        vList.add("foo");
+        
+        assertEquals(1, vList.size());
+        assertFalse(vList.isEmpty());
+        
+        val snapshot1 = vList.snapshot();
+        assertEquals(1, snapshot1.stream().count());
+        
+        vList.add("bar");
+        
+        val snapshot2 = vList.snapshot();
+        assertEquals(2, snapshot2.stream().count());
+        assertEquals("foo,bar", snapshot2.stream().collect(Collectors.joining(",")));
+        
+        val delta = vList.deltaSince(snapshot1);
+        assertEquals(1, delta.stream().count());
+        assertEquals("bar", delta.stream().collect(Collectors.joining(",")));
+        
+        vList.add("gru");
+        
+        val snapshot3 = vList.snapshot();
+        assertEquals("foo,bar,gru", snapshot3.stream().collect(Collectors.joining(",")));
+        
+        val snapshot4 = vList.deltaSince(snapshot3);
+        assertTrue(snapshot4.isEmpty());
+        
+    }
+
+}
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java
index 66aa7c9..47720e3 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationCacheDefault.java
@@ -25,10 +25,13 @@ import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.isis.commons.internal.collections._Maps;
+import org.apache.isis.commons.internal.collections.snapshot._VersionedList;
 import org.apache.isis.metamodel.facets.object.objectspecid.ObjectSpecIdFacet;
 import org.apache.isis.metamodel.spec.ObjectSpecId;
 import org.apache.isis.metamodel.spec.ObjectSpecification;
 
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.val;
 
 
@@ -37,6 +40,10 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
     private final Map<String, T> specByClassName = _Maps.newHashMap();
     private final Map<ObjectSpecId, String> classNameBySpecId = _Maps.newHashMap();
     
+    // optimization: specialized list to keep track of any additions to the cache fast
+    @Getter(value = AccessLevel.PACKAGE)
+    private final _VersionedList<T> vList = new _VersionedList<>(); 
+    
     public T get(String className) {
         return specByClassName.get(className);
     }
@@ -54,8 +61,11 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
     }
 
     public void clear() {
-        specByClassName.clear();
-        classNameBySpecId.clear();
+        synchronized(this) {
+            specByClassName.clear();
+            classNameBySpecId.clear();
+            vList.clear();
+        }
     }
 
     /** @returns thread-safe defensive copy */
@@ -64,7 +74,7 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
             return new ArrayList<T>(specByClassName.values());
         }
     }
-
+    
     public T getByObjectType(final ObjectSpecId objectSpecID) {
         val className = classNameBySpecId.get(objectSpecID);
         return className != null ? specByClassName.get(className) : null;
@@ -76,7 +86,10 @@ class SpecificationCacheDefault<T extends ObjectSpecification> {
         }
         val className = spec.getCorrespondingClass().getName();
         val specId = spec.getSpecId();
-        specByClassName.put(className, spec);
+        val existing = specByClassName.put(className, spec);
+        if(existing==null) {
+            vList.add(spec); // add to vList only if we don't have it already
+        }
         if (specId == null) {
             return;
         }
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java
index 152f5a2..dd871e1 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoader.java
@@ -19,6 +19,7 @@
 package org.apache.isis.metamodel.specloader;
 
 import java.util.Collection;
+import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
 
@@ -71,6 +72,14 @@ public interface SpecificationLoader {
      * @return snapshot of all the (currently) loaded specifications, a defensive-copy 
      */
     Collection<ObjectSpecification> snapshotSpecifications();
+    
+    /**
+     * Similar to {@link #snapshotSpecifications()}, but also handles concurrent additions that occur 
+     * during traversal. 
+     *  
+     * @param action
+     */
+    void forEach(Consumer<ObjectSpecification> onSpec);
 
     /**
      * Lookup a specification that has bean loaded before.
@@ -118,4 +127,6 @@ public interface SpecificationLoader {
 
 
 
+
+
 }
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
index 6dcf67a..c260fe5 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/SpecificationLoaderDefault.java
@@ -20,6 +20,7 @@ package org.apache.isis.metamodel.specloader;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
@@ -287,6 +288,18 @@ public class SpecificationLoaderDefault implements SpecificationLoader {
     public Collection<ObjectSpecification> snapshotSpecifications() {
         return cache.snapshotSpecs();
     }
+    
+    @Override
+    public void forEach(Consumer<ObjectSpecification> onSpec) {
+        val shouldRunConcurrent = isisConfiguration.getReflector().getIntrospector().isParallelize();
+        val vList = cache.getVList(); // vList is thread-safe
+        if(shouldRunConcurrent) {
+            vList.forEachParallel(onSpec);    
+        } else {
+            vList.forEach(onSpec);
+        }
+        
+    }
 
     @Override
     public ObjectSpecification lookupBySpecIdElseLoad(ObjectSpecId objectSpecId) {
diff --git a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java
index 84b2aa2..a6b568b 100644
--- a/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java
+++ b/core/metamodel/src/main/java/org/apache/isis/metamodel/specloader/validator/MetaModelValidatorVisiting.java
@@ -21,10 +21,13 @@ package org.apache.isis.metamodel.specloader.validator;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.isis.commons.internal.collections._Lists;
+import org.apache.isis.commons.internal.debug._Probe;
 import org.apache.isis.metamodel.MetaModelContext;
 import org.apache.isis.metamodel.spec.ObjectSpecification;
+import org.apache.isis.metamodel.specloader.SpecificationLoaderDefault;
 
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
@@ -50,17 +53,38 @@ public class MetaModelValidatorVisiting extends MetaModelValidatorAbstract {
     @Override
     public void collectFailuresInto(@NonNull ValidationFailures validationFailures) {
         validateAll();
+        //validateAllLegacy();
         summarize();
         super.collectFailuresInto(validationFailures);
     }
 
+    private static _Probe probe = _Probe.unlimited().label("spec");
+    
     private void validateAll() {
 
-        val specsValidated = _Lists.<ObjectSpecification>newArrayList();
+        val specLoader = (SpecificationLoaderDefault)MetaModelContext.current().getSpecificationLoader();
+        val ladd = new LongAdder();
+        
+        specLoader.forEach(spec->{
+            ladd.increment();
+            visitor.visit(spec, this);            
+        });
+        
+        //probe.println("specsToValidate " + ladd.longValue());
+
+    }
+    
+    //TODO[2158] cleanup legacy  
+    private void validateAllLegacy() {
 
+        val shouldRunConcurrent = getConfiguration().getReflector().getIntrospector().isParallelize();
+        val specsValidated = _Lists.<ObjectSpecification>newArrayList();
+        
         while(validateSpecs(specsValidated)) {
             // validate in a loop, because the act of validating might cause additional specs to be uncovered
         }
+        
+        probe.println("specsToValidate " + specsValidated.size());
 
     }
 
@@ -76,20 +100,32 @@ public class MetaModelValidatorVisiting extends MetaModelValidatorAbstract {
 
         // don't validate any specs already processed
         specsToValidate.removeAll(specsAlreadyValidated);
+        
         if(specsToValidate.isEmpty()) {
             // don't call us again
             return false;
         }
 
+        
         // validate anything new
-        for (val objSpec : specsToValidate) {
-            if(!visitor.visit(objSpec, this)) {
-                break;
-            }
-        }
-
+        
         // add the new specs just validated to the list (for next time)
         specsAlreadyValidated.addAll(specsToValidate);
+        
+        val isConcurrentFromConfig = getConfiguration().getReflector().getIntrospector().isParallelize();
+        val runSequential = !isConcurrentFromConfig;
+        if(runSequential) { 
+            
+            for (val spec : specsToValidate) {
+                if(!visitor.visit(spec, this)) {
+                    break;
+                }
+            }
+
+        } else {
+            specsToValidate.parallelStream()
+            .forEach(spec -> visitor.visit(spec, this));
+        }
 
         // go round the loop again
         return true;