You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by aj...@apache.org on 2018/01/06 15:17:43 UTC

[19/44] jena git commit: Further simplifying new Collector API

Further simplifying new Collector API


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/f68776bc
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/f68776bc
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/f68776bc

Branch: refs/heads/master
Commit: f68776bc8672d010bb4bade41da6df40fb9d3b28
Parents: d4bcdbc
Author: ajs6f <aj...@apache.org>
Authored: Fri Nov 17 12:05:18 2017 -0500
Committer: ajs6f <aj...@apache.org>
Committed: Fri Jan 5 09:26:07 2018 -0500

----------------------------------------------------------------------
 .../jena/query/util/DatasetCollector.java       |  77 +++-
 .../query/util/DatasetIntoDatasetCollector.java |  42 ---
 .../org/apache/jena/query/util/DatasetLib.java  |  32 +-
 .../query/util/ModelIntoDatasetCollector.java   |  53 ---
 .../sparql/util/DifferenceDatasetGraph.java     |  11 +-
 .../jena/sparql/util/ImmutableDatasetGraph.java |  14 +
 .../sparql/util/IntersectionDatasetGraph.java   |   4 +-
 .../jena/sparql/util/UnionDatasetGraph.java     |  13 +-
 .../jena/sparql/util/ViewDatasetGraph.java      | 364 ++++++++++---------
 .../org/apache/jena/atlas/iterator/Iter.java    |   6 +-
 .../jena/atlas/lib/IdentityFinishCollector.java |  25 ++
 .../java/org/apache/jena/atlas/lib/Pair.java    |   9 +-
 .../org/apache/jena/util/ModelCollector.java    |  58 ++-
 .../jena/util/ModelIntoModelCollector.java      |  37 --
 .../jena/util/StatementIntoModelCollector.java  |  38 --
 15 files changed, 394 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
index 6125793..8779c4c 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
+++ b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
@@ -1,26 +1,85 @@
 package org.apache.jena.query.util;
 
+import static org.apache.jena.atlas.iterator.Iter.filter;
+import static org.apache.jena.system.Txn.executeRead;
+import static org.apache.jena.system.Txn.executeWrite;
+
+import java.util.function.BiConsumer;
 import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
-import org.apache.jena.atlas.lib.IdentityFinishCollector;
+import org.apache.jena.atlas.lib.IdentityFinishCollector.UnorderedIdentityFinishCollector;
 import org.apache.jena.query.Dataset;
 import org.apache.jena.query.DatasetFactory;
-import org.apache.jena.sparql.util.Context;
+import org.apache.jena.rdf.model.Model;
 
-public interface DatasetCollector<T> extends IdentityFinishCollector<T, Dataset> {
+public abstract class DatasetCollector implements UnorderedIdentityFinishCollector<Dataset, Dataset> {
 
     @Override
-    default Supplier<Dataset> supplier() {
+    public Supplier<Dataset> supplier() {
         return DatasetFactory::createGeneral;
     }
 
-    @Override
-    default BinaryOperator<Dataset> combiner() {
-        return DatasetCollector::union;
+    public ConcurrentDatasetCollector concurrent() {
+        return new ConcurrentDatasetCollector(this);
+    }
+
+    /**
+     * Use only with {@link Dataset}s that support transactions.
+     */
+    public static class ConcurrentDatasetCollector extends DatasetCollector
+            implements ConcurrentUnorderedIdentityFinishCollector<Dataset, Dataset> {
+
+        private final DatasetCollector collector;
+
+        public ConcurrentDatasetCollector(DatasetCollector col) {
+            this.collector = col;
+        }
+
+        @Override
+        public BinaryOperator<Dataset> combiner() {
+            return collector.combiner();
+        }
+
+        @Override
+        public BiConsumer<Dataset, Dataset> accumulator() {
+            return (d1, d2) -> executeRead(d2, () -> executeWrite(d1, () -> collector.accumulator().accept(d1, d2)));
+        }
     }
 
-    static Dataset union(final Dataset d1, final Dataset d2) {
-        return DatasetLib.union(d1, d2, Context.emptyContext);
+    public static class UnionDatasetCollector extends DatasetCollector {
+
+        @Override
+        public BinaryOperator<Dataset> combiner() {
+            return DatasetLib::union;
+        }
+
+        @Override
+        public BiConsumer<Dataset, Dataset> accumulator() {
+            return (d1, d2) -> {
+                d1.getDefaultModel().add(d2.getDefaultModel());
+                d2.listNames().forEachRemaining(
+                        name -> d1.replaceNamedModel(name, d1.getNamedModel(name).union(d2.getNamedModel(name))));
+            };
+        }
+    }
+
+    public static class IntersectionDatasetCollector extends DatasetCollector {
+
+        @Override
+        public BinaryOperator<Dataset> combiner() {
+            return DatasetLib::intersection;
+        }
+
+        @Override
+        public BiConsumer<Dataset, Dataset> accumulator() {
+            return (d1, d2) -> {
+                d1.setDefaultModel(d1.getDefaultModel().intersection(d2.getDefaultModel()));
+                filter(d2.listNames(), d1::containsNamedModel).forEachRemaining(name -> {
+                    Model intersection = d1.getNamedModel(name).intersection(d2.getNamedModel(name));
+                    d1.replaceNamedModel(name, intersection);
+                });
+            };
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/query/util/DatasetIntoDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetIntoDatasetCollector.java b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetIntoDatasetCollector.java
deleted file mode 100644
index 64af22d..0000000
--- a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetIntoDatasetCollector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.jena.query.util;
-
-import static java.util.stream.Collector.Characteristics.CONCURRENT;
-import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
-import static java.util.stream.Collector.Characteristics.UNORDERED;
-import static org.apache.jena.system.Txn.executeRead;
-import static org.apache.jena.system.Txn.executeWrite;
-
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-import org.apache.jena.ext.com.google.common.collect.ImmutableSet;
-import org.apache.jena.query.Dataset;
-
-public class DatasetIntoDatasetCollector implements DatasetCollector<Dataset> {
-
-    @Override
-    public BiConsumer<Dataset, Dataset> accumulator() {
-        return (d1, d2) -> {
-            d1.getDefaultModel().add(d2.getDefaultModel());
-            d2.listNames().forEachRemaining(name -> d1.getNamedModel(name).add(d2.getNamedModel(name)));
-        };
-    }
-
-    @Override
-    public Set<Characteristics> characteristics() {
-        return ImmutableSet.of(UNORDERED, IDENTITY_FINISH);
-    }
-
-    public static class ConcurrentStatementIntoModelCollector extends DatasetIntoDatasetCollector {
-
-        @Override
-        public BiConsumer<Dataset, Dataset> accumulator() {
-            return (d1, d2) -> executeRead(d2, () -> executeWrite(d1, () -> super.accumulator().accept(d1, d2)));
-        }
-
-        @Override
-        public Set<Characteristics> characteristics() {
-            return ImmutableSet.of(UNORDERED, IDENTITY_FINISH, CONCURRENT);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
index 4ac9482..82ef4f7 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
+++ b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
@@ -1,5 +1,7 @@
 package org.apache.jena.query.util;
 
+import static org.apache.jena.sparql.util.Context.emptyContext;
+
 import org.apache.jena.query.Dataset;
 import org.apache.jena.query.DatasetFactory;
 import org.apache.jena.sparql.util.Context;
@@ -9,15 +11,27 @@ import org.apache.jena.sparql.util.UnionDatasetGraph;
 
 public class DatasetLib {
 
-	public static Dataset union(final Dataset d1, final Dataset d2, Context c) {
-		return DatasetFactory.wrap(new UnionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
-	}
+    public static Dataset union(final Dataset d1, final Dataset d2, Context c) {
+        return DatasetFactory.wrap(new UnionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
+    }
+
+    public static Dataset union(final Dataset d1, final Dataset d2) {
+        return union(d1, d2, emptyContext);
+    }
+
+    public static Dataset intersection(final Dataset d1, final Dataset d2, Context c) {
+        return DatasetFactory.wrap(new IntersectionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
+    }
 
-	public static Dataset intersection(final Dataset d1, final Dataset d2, Context c) {
-		return DatasetFactory.wrap(new IntersectionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
-	}
+    public static Dataset intersection(final Dataset d1, final Dataset d2) {
+        return intersection(d1, d2, emptyContext);
+    }
 
-	public static Dataset difference(final Dataset d1, final Dataset d2, Context c) {
-		return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
-	}
+    public static Dataset difference(final Dataset d1, final Dataset d2, Context c) {
+        return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
+    }
+    
+    public static Dataset difference(final Dataset d1, final Dataset d2) {
+        return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), emptyContext));
+    }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/query/util/ModelIntoDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/util/ModelIntoDatasetCollector.java b/jena-arq/src/main/java/org/apache/jena/query/util/ModelIntoDatasetCollector.java
deleted file mode 100644
index 290c760..0000000
--- a/jena-arq/src/main/java/org/apache/jena/query/util/ModelIntoDatasetCollector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.jena.query.util;
-
-import static java.util.stream.Collector.Characteristics.CONCURRENT;
-import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
-import static java.util.stream.Collector.Characteristics.UNORDERED;
-import static org.apache.jena.system.Txn.executeWrite;
-
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-import org.apache.jena.ext.com.google.common.collect.ImmutableSet;
-import org.apache.jena.query.Dataset;
-import org.apache.jena.rdf.model.Model;
-import org.apache.jena.sparql.core.Quad;
-
-public class ModelIntoDatasetCollector implements DatasetCollector<Model> {
-
-    private String graphName;
-
-    public ModelIntoDatasetCollector(String graphName) {
-        this.graphName = graphName;
-    }
-
-    /**
-     * Collects models into the default graph.
-     */
-    public ModelIntoDatasetCollector() {
-        this(Quad.defaultGraphIRI.getURI());
-    }
-
-    @Override
-    public BiConsumer<Dataset, Model> accumulator() {
-        return (d, m) -> d.getNamedModel(graphName).add(m);
-    }
-
-    @Override
-    public Set<Characteristics> characteristics() {
-        return ImmutableSet.of(UNORDERED, IDENTITY_FINISH);
-    }
-
-    public static class ConcurrentStatementIntoModelCollector extends ModelIntoDatasetCollector {
-
-        @Override
-        public BiConsumer<Dataset, Model> accumulator() {
-            return (d, m) -> m.executeInTxn(() -> executeWrite(d, () -> super.accumulator().accept(d, m)));
-        }
-
-        @Override
-        public Set<Characteristics> characteristics() {
-            return ImmutableSet.of(UNORDERED, IDENTITY_FINISH, CONCURRENT);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/sparql/util/DifferenceDatasetGraph.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/DifferenceDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/DifferenceDatasetGraph.java
index bf51da8..8c27f81 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/util/DifferenceDatasetGraph.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/DifferenceDatasetGraph.java
@@ -4,6 +4,7 @@ import static org.apache.jena.sparql.core.Quad.ANY;
 import static org.apache.jena.sparql.core.Quad.isDefaultGraph;
 
 import java.util.Iterator;
+import java.util.function.Function;
 
 import org.apache.jena.graph.Graph;
 import org.apache.jena.graph.Node;
@@ -15,10 +16,14 @@ public class DifferenceDatasetGraph extends ViewDatasetGraph {
 	public DifferenceDatasetGraph(DatasetGraph left, DatasetGraph right, Context c) {
 		super(left, right, c);
 	}
+	
+	private Graph difference(Function<DatasetGraph, Graph> op) {
+	    return join(Difference::new, op);
+	}
 
 	@Override
 	public Graph getDefaultGraph() {
-		return new Difference(getRight().getDefaultGraph(), getLeft().getDefaultGraph());
+		return difference(DatasetGraph::getDefaultGraph);
 	}
 
 	@Override
@@ -26,7 +31,7 @@ public class DifferenceDatasetGraph extends ViewDatasetGraph {
 		return isDefaultGraph(graphNode)
 				? getDefaultGraph()
 				: getRight().containsGraph(graphNode)
-						? new Difference(getLeft().getGraph(graphNode), getRight().getGraph(graphNode))
+						? difference(dsg -> dsg.getGraph(graphNode))
 						: getLeft().getGraph(graphNode);
 	}
 
@@ -42,7 +47,7 @@ public class DifferenceDatasetGraph extends ViewDatasetGraph {
 
 	@Override
 	public boolean contains(Node g, Node s, Node p, Node o) {
-		return getLeft().contains(g, s, p, o) && !getRight().contains(g, s, p, o);
+	    return both(dsg -> dsg.contains(g, s, p, o));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/sparql/util/ImmutableDatasetGraph.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/ImmutableDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/ImmutableDatasetGraph.java
new file mode 100644
index 0000000..fe79a8e
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/ImmutableDatasetGraph.java
@@ -0,0 +1,14 @@
+package org.apache.jena.sparql.util;
+
+import java.util.Iterator;
+
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.shared.Lock;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.Quad;
+
+public abstract class ImmutableDatasetGraph implements DatasetGraph {
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/sparql/util/IntersectionDatasetGraph.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/IntersectionDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/IntersectionDatasetGraph.java
index 3d845b5..d590d50 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/util/IntersectionDatasetGraph.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/IntersectionDatasetGraph.java
@@ -16,8 +16,8 @@ public class IntersectionDatasetGraph extends ViewDatasetGraph {
 		super(left, right, c);
 	}
 
-	Graph intersect(Function<DatasetGraph, Graph> mapping) {
-		return new Intersection(mapping.apply(getLeft()), mapping.apply(getRight()));
+	Graph intersect(Function<DatasetGraph, Graph> op) {
+	    return join(Intersection::new, op);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/sparql/util/UnionDatasetGraph.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/UnionDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/UnionDatasetGraph.java
index 16d18be..516263b 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/util/UnionDatasetGraph.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/UnionDatasetGraph.java
@@ -3,7 +3,7 @@ package org.apache.jena.sparql.util;
 import java.util.Iterator;
 import java.util.function.Function;
 
-import org.apache.jena.ext.com.google.common.collect.Iterators;
+import org.apache.jena.atlas.iterator.Iter;
 import org.apache.jena.graph.Graph;
 import org.apache.jena.graph.Node;
 import org.apache.jena.graph.compose.Union;
@@ -17,11 +17,11 @@ public class UnionDatasetGraph extends ViewDatasetGraph {
     }
 
     private Graph union(Function<DatasetGraph, Graph> op) {
-        return new Union(op.apply(getLeft()), op.apply(getRight()));
+        return join(Union::new, op);
     }
 
-    <T> Iterator<T> fromEach(Function<DatasetGraph, Iterator<T>> op) {
-        return Iterators.concat(op.apply(getLeft()), op.apply(getRight()));
+    <T> Iter<T> fromEach(Function<DatasetGraph, Iterator<T>> op) {
+        return join(Iter::concat, op).distinct();
     }
 
     @Override
@@ -68,9 +68,4 @@ public class UnionDatasetGraph extends ViewDatasetGraph {
     public boolean isEmpty() {
         return both(DatasetGraph::isEmpty);
     }
-
-    @Override
-    public long size() {
-        return getLeft().size() + getRight().size();
-    }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-arq/src/main/java/org/apache/jena/sparql/util/ViewDatasetGraph.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/ViewDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/ViewDatasetGraph.java
index cb48560..24e0769 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/util/ViewDatasetGraph.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/ViewDatasetGraph.java
@@ -3,8 +3,8 @@ package org.apache.jena.sparql.util;
 import static org.apache.jena.atlas.iterator.Iter.count;
 import static org.apache.jena.atlas.iterator.Iter.map;
 import static org.apache.jena.ext.com.google.common.collect.Iterators.concat;
-import static org.apache.jena.query.ReadWrite.WRITE;
-import static org.apache.jena.sparql.core.Quad.ANY;
+import static org.apache.jena.graph.Node.ANY;
+import static org.apache.jena.query.ReadWrite.READ;
 import static org.apache.jena.sparql.core.Quad.defaultGraphIRI;
 import static org.apache.jena.sparql.util.graph.GraphUtils.triples2quads;
 
@@ -21,178 +21,190 @@ import org.apache.jena.sparql.core.Quad;
 
 public abstract class ViewDatasetGraph extends Pair.OfSameType<DatasetGraph> implements DatasetGraph {
 
-	private final Context context;
-
-	private final Lock lock;
-
-	public ViewDatasetGraph(DatasetGraph left, DatasetGraph right, Context c) {
-		super(left, right);
-		this.context = c;
-		this.lock = new PairLock(left.getLock(), right.getLock());
-	}
-
-	private void noMutation() {
-		throw new UnsupportedOperationException("Views do not allow mutation!");
-	}
-
-	@Override
-	public void commit() {
-		noMutation();
-	}
-
-
-	@Override
-	public void begin(ReadWrite readWrite) {
-		if (readWrite.equals(WRITE)) noMutation();
-		forEach(dsg -> dsg.begin(readWrite));
-	}
-
-	@Override
-	public void abort() {
-		noMutation();
-	}
-
-	@Override
-	public void end() {
-		forEach(DatasetGraph::end);
-	}
-
-	@Override
-	public boolean isInTransaction() {
-		return either(DatasetGraph::isInTransaction);
-	}
-
-	@Override
-	public void setDefaultGraph(Graph g) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void addGraph(Node graphName, Graph graph) {
-		noMutation();
-	}
-
-	@Override
-	public void removeGraph(Node graphName) {
-		noMutation();
-	}
-
-	@Override
-	public void add(Quad quad) {
-		noMutation();
-	}
-
-	@Override
-	public void delete(Quad quad) {
-		noMutation();
-	}
-
-	@Override
-	public void add(Node g, Node s, Node p, Node o) {
-		noMutation();
-	}
-
-	@Override
-	public void delete(Node g, Node s, Node p, Node o) {
-		noMutation();
-	}
-
-	@Override
-	public void deleteAny(Node g, Node s, Node p, Node o) {
-		noMutation();
-	}
-
-	@Override
-	public void clear() {
-		noMutation();
-	}
-
-	@Override
-	public Iterator<Quad> find() {
-		return find(ANY);
-	}
-
-	@Override
-	public Iterator<Quad> find(Quad q) {
-		return find(q.getGraph(), q.getSubject(), q.getPredicate(), q.getObject());
-	}
-
-	@Override
-	public Iterator<Quad> find(Node g, Node s, Node p, Node o) {
-		return g.isConcrete()
-				? findInOneGraph(g, s, p, o)
-				: concat(findNG(null, s, p, o), findInOneGraph(defaultGraphIRI, s, p, o));
-	}
-
-	@Override
-	public Iterator<Quad> findNG(Node g, Node s, Node p, Node o) {
-		return g.isConcrete()
-				? findInOneGraph(g, s, p, o)
-				: concat(map(listGraphNodes(), gn -> findInOneGraph(gn, s, p, o)));
-	}
-
-	protected Iterator<Quad> findInOneGraph(Node g, Node s, Node p, Node o) {
-		return triples2quads(g, getGraph(g).find(s, p, o));
-	}
-
-	@Override
-	public Graph getUnionGraph() {
-		return new MultiUnion(map(listGraphNodes(), this::getGraph));
-	}
-
-	@Override
-	public boolean contains(Quad q) {
-		return contains(q.getGraph(), q.getSubject(), q.getPredicate(), q.getObject());
-	}
-
-	@Override
-	public Lock getLock() {
-		return lock;
-	}
-
-	@Override
-	public Context getContext() {
-		return context;
-	}
-
-	@Override
-	public void close() {
-	}
-
-	@Override
-	public boolean supportsTransactions() {
-		return both(DatasetGraph::supportsTransactions);
-	}
-
-	@Override
-	public boolean supportsTransactionAbort() {
-		return false;
-	}
-
-	@Override
-	public long size() {
-		return count(listGraphNodes());
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return listGraphNodes().hasNext();
-	}
-
-	private static class PairLock extends Pair.OfSameType<Lock> implements Lock {
-
-		public PairLock(Lock left, Lock right) {
-			super(left, right);
-		}
-
-		@Override
-		public void enterCriticalSection(boolean readLockRequested) {
-			forEach(lock -> lock.enterCriticalSection(readLockRequested));
-		}
-
-		@Override
-		public void leaveCriticalSection() {
-			forEach(Lock::leaveCriticalSection);
-		}
-	}
+    private Context context;
+
+    private final Lock lock;
+
+    public ViewDatasetGraph(DatasetGraph left, DatasetGraph right, Context c) {
+        super(left, right);
+        this.context = c;
+        this.lock = new PairLock(left.getLock(), right.getLock());
+    }
+
+    private static void throwNoMutationAllowed() {
+        throw new UnsupportedOperationException("This view does not allow mutation!");
+    }
+
+    @Override
+    public void commit() {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void begin(ReadWrite readWrite) {
+        switch (readWrite) {
+        case WRITE:
+            throwNoMutationAllowed();
+        case READ:
+            forEach(ViewDatasetGraph::beginRead);
+        }
+    }
+    
+    private static void beginRead(DatasetGraph dsg) {
+        dsg.begin(READ);
+    }
+
+    @Override
+    public void abort() {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void end() {
+        forEach(DatasetGraph::end);
+    }
+
+    @Override
+    public boolean isInTransaction() {
+        return either(DatasetGraph::isInTransaction);
+    }
+
+    @Override
+    public void setDefaultGraph(Graph g) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void addGraph(Node graphName, Graph graph) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void removeGraph(Node graphName) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void add(Quad quad) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void delete(Quad quad) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void add(Node g, Node s, Node p, Node o) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void delete(Node g, Node s, Node p, Node o) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void deleteAny(Node g, Node s, Node p, Node o) {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public void clear() {
+        throwNoMutationAllowed();
+    }
+
+    @Override
+    public Iterator<Quad> find() {
+        return find(Quad.ANY);
+    }
+
+    @Override
+    public Iterator<Quad> find(Quad q) {
+        return find(q.getGraph(), q.getSubject(), q.getPredicate(), q.getObject());
+    }
+
+    @Override
+    public Iterator<Quad> find(Node g, Node s, Node p, Node o) {
+        return g.isConcrete()
+                ? findInOneGraph(g, s, p, o)
+                : concat(findNG(ANY, s, p, o), findInOneGraph(defaultGraphIRI, s, p, o));
+    }
+
+    @Override
+    public Iterator<Quad> findNG(Node g, Node s, Node p, Node o) {
+        return g.isConcrete()
+                ? findInOneGraph(g, s, p, o)
+                : concat(map(listGraphNodes(), gn -> findInOneGraph(gn, s, p, o)));
+    }
+
+    protected Iterator<Quad> findInOneGraph(Node g, Node s, Node p, Node o) {
+        return triples2quads(g, getGraph(g).find(s, p, o));
+    }
+
+    @Override
+    public Graph getUnionGraph() {
+        return new MultiUnion(map(listGraphNodes(), this::getGraph));
+    }
+
+    @Override
+    public boolean contains(Quad q) {
+        return contains(q.getGraph(), q.getSubject(), q.getPredicate(), q.getObject());
+    }
+
+    @Override
+    public Lock getLock() {
+        return lock;
+    }
+
+    @Override
+    public Context getContext() {
+        return context;
+    }
+
+    public DatasetGraph setContext(Context c) {
+        this.context = c;
+        return this;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean supportsTransactions() {
+        return both(DatasetGraph::supportsTransactions);
+    }
+
+    @Override
+    public boolean supportsTransactionAbort() {
+        return false;
+    }
+
+    @Override
+    public long size() {
+        return count(listGraphNodes());
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return listGraphNodes().hasNext();
+    }
+
+    private static class PairLock extends Pair.OfSameType<Lock> implements Lock {
+
+        public PairLock(Lock left, Lock right) {
+            super(left, right);
+        }
+
+        @Override
+        public void enterCriticalSection(boolean readLockRequested) {
+            forEach(lock -> lock.enterCriticalSection(readLockRequested));
+        }
+
+        @Override
+        public void leaveCriticalSection() {
+            forEach(Lock::leaveCriticalSection);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-base/src/main/java/org/apache/jena/atlas/iterator/Iter.java
----------------------------------------------------------------------
diff --git a/jena-base/src/main/java/org/apache/jena/atlas/iterator/Iter.java b/jena-base/src/main/java/org/apache/jena/atlas/iterator/Iter.java
index 2dbc734..cb9871b 100644
--- a/jena-base/src/main/java/org/apache/jena/atlas/iterator/Iter.java
+++ b/jena-base/src/main/java/org/apache/jena/atlas/iterator/Iter.java
@@ -614,11 +614,11 @@ public class Iter<T> implements Iterator<T> {
     /** An {@code Iterator} of 2 {@code Iterator}'s.
      * See also {@link IteratorConcat}.
      */
-    public static <T> Iterator<T> concat(Iterator<T> iter1, Iterator<T> iter2) {
+    public static <T> Iter<T> concat(Iterator<T> iter1, Iterator<T> iter2) {
         if ( iter1 == null )
-            return iter2 ;
+            return iter(iter2) ;
         if ( iter2 == null )
-            return iter1 ;
+            return iter(iter1) ;
         return iter(iter1).append(iter(iter2)) ;
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-base/src/main/java/org/apache/jena/atlas/lib/IdentityFinishCollector.java
----------------------------------------------------------------------
diff --git a/jena-base/src/main/java/org/apache/jena/atlas/lib/IdentityFinishCollector.java b/jena-base/src/main/java/org/apache/jena/atlas/lib/IdentityFinishCollector.java
index 4c801c5..2386b4b 100644
--- a/jena-base/src/main/java/org/apache/jena/atlas/lib/IdentityFinishCollector.java
+++ b/jena-base/src/main/java/org/apache/jena/atlas/lib/IdentityFinishCollector.java
@@ -1,12 +1,37 @@
 package org.apache.jena.atlas.lib;
 
+import static java.util.stream.Collector.Characteristics.CONCURRENT;
+import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
+import static java.util.stream.Collector.Characteristics.UNORDERED;
+
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collector;
 
+import org.apache.jena.ext.com.google.common.collect.ImmutableSet;
+
 public interface IdentityFinishCollector<T, A> extends Collector<T, A, A> {
 
+    static Set<Characteristics> CHARACTERISTICS = ImmutableSet.of(IDENTITY_FINISH);
+
     @Override
     default Function<A, A> finisher() {
         return Function.identity();
     }
+
+    @Override
+    default Set<Characteristics> characteristics() {
+        return CHARACTERISTICS;
+    }
+
+    public interface UnorderedIdentityFinishCollector<T, A> extends IdentityFinishCollector<T, A> {
+
+        static Set<Characteristics> CHARACTERISTICS = ImmutableSet.of(UNORDERED, IDENTITY_FINISH);
+    }
+
+    public interface ConcurrentUnorderedIdentityFinishCollector<T, A> extends UnorderedIdentityFinishCollector<T, A> {
+
+        static Set<Characteristics> CHARACTERISTICS = ImmutableSet.of(UNORDERED, IDENTITY_FINISH);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-base/src/main/java/org/apache/jena/atlas/lib/Pair.java
----------------------------------------------------------------------
diff --git a/jena-base/src/main/java/org/apache/jena/atlas/lib/Pair.java b/jena-base/src/main/java/org/apache/jena/atlas/lib/Pair.java
index 0e17e3b..097e147 100644
--- a/jena-base/src/main/java/org/apache/jena/atlas/lib/Pair.java
+++ b/jena-base/src/main/java/org/apache/jena/atlas/lib/Pair.java
@@ -22,6 +22,7 @@ import static org.apache.jena.atlas.lib.Lib.hashCodeObject ;
 import static org.apache.jena.atlas.lib.StrUtils.str ;
 
 import java.util.Objects;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -51,11 +52,15 @@ public class Pair<A, B>
         }
 
         public boolean both(Function<T, Boolean> op) {
-            return op.apply(a) && op.apply(b);
+            return join(Boolean::logicalAnd, op);
         }
 
         public boolean either(Function<T, Boolean> op) {
-            return op.apply(a) || op.apply(b);
+            return join(Boolean::logicalOr, op);
+        }
+        
+        public <S, X> S join(BiFunction<X, X, S> f, Function<T, X> op) {
+            return f.apply(op.apply(a), op.apply(b));
         }
     }
     

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-core/src/main/java/org/apache/jena/util/ModelCollector.java
----------------------------------------------------------------------
diff --git a/jena-core/src/main/java/org/apache/jena/util/ModelCollector.java b/jena-core/src/main/java/org/apache/jena/util/ModelCollector.java
index 09605f5..aaa82f3 100644
--- a/jena-core/src/main/java/org/apache/jena/util/ModelCollector.java
+++ b/jena-core/src/main/java/org/apache/jena/util/ModelCollector.java
@@ -1,21 +1,67 @@
 package org.apache.jena.util;
 
+import java.util.function.BiConsumer;
 import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
-import org.apache.jena.atlas.lib.IdentityFinishCollector;
+import org.apache.jena.atlas.lib.IdentityFinishCollector.UnorderedIdentityFinishCollector;
 import org.apache.jena.rdf.model.Model;
 import org.apache.jena.rdf.model.ModelFactory;
 
-public interface ModelCollector<T> extends IdentityFinishCollector<T, Model> {
+public abstract class ModelCollector implements UnorderedIdentityFinishCollector<Model, Model> {
 
     @Override
-    default Supplier<Model> supplier() {
+    public Supplier<Model> supplier() {
         return ModelFactory::createDefaultModel;
     }
+    
+    public ConcurrentModelCollector concurrent() {
+        return new ConcurrentModelCollector(this);
+    }
 
-    @Override
-    default BinaryOperator<Model> combiner() {
-        return ModelFactory::createUnion;
+    public static class ConcurrentModelCollector extends ModelCollector
+            implements ConcurrentUnorderedIdentityFinishCollector<Model, Model> {
+
+        private final ModelCollector collector;
+
+        public ConcurrentModelCollector(ModelCollector col) {
+            this.collector = col;
+        }
+
+        @Override
+        public BiConsumer<Model, Model> accumulator() {
+            return (m1, m2) -> m2.executeInTxn(() -> m1.executeInTxn(() -> collector.accumulator().accept(m1, m2)));
+        }
+
+        @Override
+        public BinaryOperator<Model> combiner() {
+            return collector.combiner();
+        }
+    }
+
+    public static class UnionModelCollector extends ModelCollector {
+
+        @Override
+        public BinaryOperator<Model> combiner() {
+            return ModelFactory::createUnion;
+        }
+
+        @Override
+        public BiConsumer<Model, Model> accumulator() {
+            return Model::add;
+        }
+    }
+
+    public static class IntersectionModelCollector extends ModelCollector {
+
+        @Override
+        public BinaryOperator<Model> combiner() {
+            return Model::intersection;
+        }
+
+        @Override
+        public BiConsumer<Model, Model> accumulator() {
+            return (m1, m2) -> m1.remove(m1.difference(m2));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-core/src/main/java/org/apache/jena/util/ModelIntoModelCollector.java
----------------------------------------------------------------------
diff --git a/jena-core/src/main/java/org/apache/jena/util/ModelIntoModelCollector.java b/jena-core/src/main/java/org/apache/jena/util/ModelIntoModelCollector.java
deleted file mode 100644
index 413ae8f..0000000
--- a/jena-core/src/main/java/org/apache/jena/util/ModelIntoModelCollector.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.jena.util;
-
-import static java.util.stream.Collector.Characteristics.CONCURRENT;
-import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
-import static java.util.stream.Collector.Characteristics.UNORDERED;
-
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-import org.apache.jena.ext.com.google.common.collect.ImmutableSet;
-import org.apache.jena.rdf.model.Model;
-
-public class ModelIntoModelCollector implements ModelCollector<Model> {
-
-    @Override
-    public BiConsumer<Model, Model> accumulator() {
-        return Model::add;
-    }
-
-    @Override
-    public Set<Characteristics> characteristics() {
-        return ImmutableSet.of(UNORDERED, IDENTITY_FINISH);
-    }
-
-    public static class ConcurrentModelIntoModelCollector extends ModelIntoModelCollector {
-
-        @Override
-        public BiConsumer<Model, Model> accumulator() {
-            return (m1, m2) -> m1.executeInTxn(() -> m2.executeInTxn(() -> super.accumulator().accept(m1, m2)));
-        }
-
-        @Override
-        public Set<Characteristics> characteristics() {
-            return ImmutableSet.of(UNORDERED, IDENTITY_FINISH, CONCURRENT);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/f68776bc/jena-core/src/main/java/org/apache/jena/util/StatementIntoModelCollector.java
----------------------------------------------------------------------
diff --git a/jena-core/src/main/java/org/apache/jena/util/StatementIntoModelCollector.java b/jena-core/src/main/java/org/apache/jena/util/StatementIntoModelCollector.java
deleted file mode 100644
index 773bea5..0000000
--- a/jena-core/src/main/java/org/apache/jena/util/StatementIntoModelCollector.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.jena.util;
-
-import static java.util.stream.Collector.Characteristics.CONCURRENT;
-import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
-import static java.util.stream.Collector.Characteristics.UNORDERED;
-
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-import org.apache.jena.ext.com.google.common.collect.ImmutableSet;
-import org.apache.jena.rdf.model.Model;
-import org.apache.jena.rdf.model.Statement;
-
-public class StatementIntoModelCollector implements ModelCollector<Statement> {
-
-    @Override
-    public BiConsumer<Model, Statement> accumulator() {
-        return Model::add;
-    }
-
-    @Override
-    public Set<Characteristics> characteristics() {
-        return ImmutableSet.of(UNORDERED, IDENTITY_FINISH);
-    }
-
-    public static class ConcurrentStatementIntoModelCollector extends StatementIntoModelCollector {
-
-        @Override
-        public BiConsumer<Model, Statement> accumulator() {
-            return (m, s) -> m.executeInTxn(() -> m.add(s));
-        }
-
-        @Override
-        public Set<Characteristics> characteristics() {
-            return ImmutableSet.of(UNORDERED, IDENTITY_FINISH, CONCURRENT);
-        }
-    }
-}