You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by aj...@apache.org on 2018/01/06 15:17:56 UTC

[32/44] jena git commit: Moving collectors into oaj.sparql.util.compose

Moving collectors into oaj.sparql.util.compose


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/09ceffc3
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/09ceffc3
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/09ceffc3

Branch: refs/heads/master
Commit: 09ceffc3f50a51d8e236ca4e96ed979640ecab0b
Parents: 9536bcd
Author: ajs6f <aj...@apache.org>
Authored: Tue Jan 2 11:23:30 2018 -0500
Committer: ajs6f <aj...@apache.org>
Committed: Fri Jan 5 09:32:55 2018 -0500

----------------------------------------------------------------------
 .../jena/query/util/DatasetCollector.java       | 125 -------------------
 .../org/apache/jena/query/util/DatasetLib.java  |  72 -----------
 .../sparql/util/compose/DatasetCollector.java   | 125 +++++++++++++++++++
 .../jena/sparql/util/compose/DatasetLib.java    |  72 +++++++++++
 .../jena/query/util/TestDatasetCollector.java   |  78 ------------
 .../util/TestIntersectionDatasetCollector.java  |  63 ----------
 .../query/util/TestUnionDatasetCollector.java   |  71 -----------
 .../org/apache/jena/sparql/util/TS_Util.java    |   4 +-
 .../util/compose/TS_DatasetCollectors.java      |  28 +++++
 .../util/compose/TestDatasetCollector.java      |  79 ++++++++++++
 .../TestIntersectionDatasetCollector.java       |  65 ++++++++++
 .../util/compose/TestUnionDatasetCollector.java |  73 +++++++++++
 12 files changed, 445 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
deleted file mode 100644
index 1e72cd8..0000000
--- a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetCollector.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.query.util;
-
-import static org.apache.jena.system.Txn.*;
-
-import java.util.function.*;
-
-import org.apache.jena.atlas.lib.IdentityFinishCollector.UnorderedIdentityFinishCollector;
-import org.apache.jena.query.Dataset;
-import org.apache.jena.query.DatasetFactory;
-import org.apache.jena.rdf.model.Model;
-
-public abstract class DatasetCollector implements UnorderedIdentityFinishCollector<Dataset, Dataset> {
-
-    @Override
-    public Supplier<Dataset> supplier() {
-        return DatasetFactory::createTxnMem;
-    }
-
-    public ConcurrentDatasetCollector concurrent() {
-        return new ConcurrentDatasetCollector(this);
-    }
-
-    /**
-     * Use only with {@link Dataset}s that support transactions.
-     */
-    public static class ConcurrentDatasetCollector extends DatasetCollector
-            implements ConcurrentUnorderedIdentityFinishCollector<Dataset, Dataset> {
-
-        private final DatasetCollector collector;
-
-        public ConcurrentDatasetCollector(DatasetCollector col) {
-            this.collector = col;
-        }
-
-        @Override
-        public BinaryOperator<Dataset> combiner() {
-            return (d1, d2) ->  calculateRead(d2, () -> calculateWrite(d1, () -> collector.combiner().apply(d1, d2)));
-        }
-
-        @Override
-        public BiConsumer<Dataset, Dataset> accumulator() {
-            return (d1, d2) -> executeRead(d2, () -> executeWrite(d1, () -> collector.accumulator().accept(d1, d2)));
-        }
-    }
-
-    public static class UnionDatasetCollector extends DatasetCollector {
-
-        @Override
-        public BinaryOperator<Dataset> combiner() {
-            return DatasetLib::union;
-        }
-
-        @Override
-        public BiConsumer<Dataset, Dataset> accumulator() {
-            return (d1, d2) -> {
-                d1.getDefaultModel().add(d2.getDefaultModel());
-                d2.listNames().forEachRemaining(name -> {
-                    Model union = d1.getNamedModel(name).union(d2.getNamedModel(name));
-                    d1.replaceNamedModel(name, union);
-                });
-            };
-        }
-    }
-
-    public static class IntersectionDatasetCollector extends DatasetCollector {
-
-        /**
-         * The first element is treated differently because
-         * {@link DatasetCollector#supplier()} does not provide an identity element for
-         * intersection.
-         */
-        private volatile boolean afterFirstElement = false;
-
-        @Override
-        public BinaryOperator<Dataset> combiner() {
-            return DatasetLib::intersection;
-        }
-
-        @Override
-        public BiConsumer<Dataset, Dataset> accumulator() {
-            return (d1, d2) -> {
-                if (afterFirstElement) {
-                    d1.setDefaultModel(d1.getDefaultModel().intersection(d2.getDefaultModel()));
-                    d1.listNames().forEachRemaining(name -> {
-                        if (d2.containsNamedModel(name)) {
-                            Model intersection = d1.getNamedModel(name).intersection(d2.getNamedModel(name));
-                            d1.replaceNamedModel(name, intersection);
-                        } else d1.removeNamedModel(name);
-                    });
-                } else {
-                    // first element of the stream
-                    d1.setDefaultModel(d2.getDefaultModel());
-                    d2.listNames().forEachRemaining(name -> d1.replaceNamedModel(name, d2.getNamedModel(name)));
-                    afterFirstElement = true;
-                }
-            };
-        }
-    }
-
-    static DatasetCollector union() {
-        return new UnionDatasetCollector();
-    }
-
-    static DatasetCollector intersect() {
-        return new IntersectionDatasetCollector();
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java b/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
deleted file mode 100644
index d56ce07..0000000
--- a/jena-arq/src/main/java/org/apache/jena/query/util/DatasetLib.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.query.util;
-
-import static org.apache.jena.sparql.util.Context.emptyContext;
-
-import org.apache.jena.query.Dataset;
-import org.apache.jena.query.DatasetFactory;
-import org.apache.jena.sparql.util.Context;
-import org.apache.jena.sparql.util.DifferenceDatasetGraph;
-import org.apache.jena.sparql.util.IntersectionDatasetGraph;
-import org.apache.jena.sparql.util.UnionDatasetGraph;
-
-public class DatasetLib {
-
-    public static Dataset union(final Dataset d1, final Dataset d2, Context c) {
-        return DatasetFactory.wrap(new UnionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
-    }
-
-    public static Dataset union(final Dataset d1, final Dataset d2) {
-        return union(d1, d2, emptyContext);
-    }
-
-    public static Dataset intersection(final Dataset d1, final Dataset d2, Context c) {
-        return DatasetFactory.wrap(new IntersectionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
-    }
-
-    public static Dataset intersection(final Dataset d1, final Dataset d2) {
-        return intersection(d1, d2, emptyContext);
-    }
-
-    public static Dataset difference(final Dataset d1, final Dataset d2, Context c) {
-        return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
-    }
-
-    public static Dataset difference(final Dataset d1, final Dataset d2) {
-        return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), emptyContext));
-    }
-
-    public static Collectors collectors() {
-        return Collectors.instance;
-    }
-
-    static class Collectors {
-
-        private static final Collectors instance = new Collectors();
-
-        public DatasetCollector union() {
-            return DatasetCollector.union();
-        }
-
-        public DatasetCollector intersect() {
-            return DatasetCollector.intersect();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetCollector.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetCollector.java
new file mode 100644
index 0000000..e44e400
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetCollector.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.util.compose;
+
+import static org.apache.jena.system.Txn.*;
+
+import java.util.function.*;
+
+import org.apache.jena.atlas.lib.IdentityFinishCollector.UnorderedIdentityFinishCollector;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Model;
+
+public abstract class DatasetCollector implements UnorderedIdentityFinishCollector<Dataset, Dataset> {
+
+    @Override
+    public Supplier<Dataset> supplier() {
+        return DatasetFactory::createTxnMem;
+    }
+
+    public ConcurrentDatasetCollector concurrent() {
+        return new ConcurrentDatasetCollector(this);
+    }
+
+    /**
+     * Use only with {@link Dataset}s that support transactions.
+     */
+    public static class ConcurrentDatasetCollector extends DatasetCollector
+            implements ConcurrentUnorderedIdentityFinishCollector<Dataset, Dataset> {
+
+        private final DatasetCollector collector;
+
+        public ConcurrentDatasetCollector(DatasetCollector col) {
+            this.collector = col;
+        }
+
+        @Override
+        public BinaryOperator<Dataset> combiner() {
+            return (d1, d2) ->  calculateRead(d2, () -> calculateWrite(d1, () -> collector.combiner().apply(d1, d2)));
+        }
+
+        @Override
+        public BiConsumer<Dataset, Dataset> accumulator() {
+            return (d1, d2) -> executeRead(d2, () -> executeWrite(d1, () -> collector.accumulator().accept(d1, d2)));
+        }
+    }
+
+    public static class UnionDatasetCollector extends DatasetCollector {
+
+        @Override
+        public BinaryOperator<Dataset> combiner() {
+            return DatasetLib::union;
+        }
+
+        @Override
+        public BiConsumer<Dataset, Dataset> accumulator() {
+            return (d1, d2) -> {
+                d1.getDefaultModel().add(d2.getDefaultModel());
+                d2.listNames().forEachRemaining(name -> {
+                    Model union = d1.getNamedModel(name).union(d2.getNamedModel(name));
+                    d1.replaceNamedModel(name, union);
+                });
+            };
+        }
+    }
+
+    public static class IntersectionDatasetCollector extends DatasetCollector {
+
+        /**
+         * The first element is treated differently because
+         * {@link DatasetCollector#supplier()} does not provide an identity element for
+         * intersection.
+         */
+        private volatile boolean afterFirstElement = false;
+
+        @Override
+        public BinaryOperator<Dataset> combiner() {
+            return DatasetLib::intersection;
+        }
+
+        @Override
+        public BiConsumer<Dataset, Dataset> accumulator() {
+            return (d1, d2) -> {
+                if (afterFirstElement) {
+                    d1.setDefaultModel(d1.getDefaultModel().intersection(d2.getDefaultModel()));
+                    d1.listNames().forEachRemaining(name -> {
+                        if (d2.containsNamedModel(name)) {
+                            Model intersection = d1.getNamedModel(name).intersection(d2.getNamedModel(name));
+                            d1.replaceNamedModel(name, intersection);
+                        } else d1.removeNamedModel(name);
+                    });
+                } else {
+                    // first element of the stream
+                    d1.setDefaultModel(d2.getDefaultModel());
+                    d2.listNames().forEachRemaining(name -> d1.replaceNamedModel(name, d2.getNamedModel(name)));
+                    afterFirstElement = true;
+                }
+            };
+        }
+    }
+
+    static DatasetCollector union() {
+        return new UnionDatasetCollector();
+    }
+
+    static DatasetCollector intersect() {
+        return new IntersectionDatasetCollector();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetLib.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetLib.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetLib.java
new file mode 100644
index 0000000..db45d7f
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/compose/DatasetLib.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.util.compose;
+
+import static org.apache.jena.sparql.util.Context.emptyContext;
+
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.DifferenceDatasetGraph;
+import org.apache.jena.sparql.util.IntersectionDatasetGraph;
+import org.apache.jena.sparql.util.UnionDatasetGraph;
+
+public class DatasetLib {
+
+    public static Dataset union(final Dataset d1, final Dataset d2, Context c) {
+        return DatasetFactory.wrap(new UnionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
+    }
+
+    public static Dataset union(final Dataset d1, final Dataset d2) {
+        return union(d1, d2, emptyContext);
+    }
+
+    public static Dataset intersection(final Dataset d1, final Dataset d2, Context c) {
+        return DatasetFactory.wrap(new IntersectionDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
+    }
+
+    public static Dataset intersection(final Dataset d1, final Dataset d2) {
+        return intersection(d1, d2, emptyContext);
+    }
+
+    public static Dataset difference(final Dataset d1, final Dataset d2, Context c) {
+        return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), c));
+    }
+
+    public static Dataset difference(final Dataset d1, final Dataset d2) {
+        return DatasetFactory.wrap(new DifferenceDatasetGraph(d1.asDatasetGraph(), d2.asDatasetGraph(), emptyContext));
+    }
+
+    public static Collectors collectors() {
+        return Collectors.instance;
+    }
+
+    public static class Collectors {
+
+        private static final Collectors instance = new Collectors();
+
+        public DatasetCollector union() {
+            return DatasetCollector.union();
+        }
+
+        public DatasetCollector intersect() {
+            return DatasetCollector.intersect();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/query/util/TestDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/query/util/TestDatasetCollector.java b/jena-arq/src/test/java/org/apache/jena/query/util/TestDatasetCollector.java
deleted file mode 100644
index 10e24dc..0000000
--- a/jena-arq/src/test/java/org/apache/jena/query/util/TestDatasetCollector.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.query.util;
-
-import static org.apache.jena.rdf.model.ModelFactory.createModelForGraph;
-import static org.apache.jena.sparql.sse.SSE.parseGraph;
-
-import java.util.stream.Stream;
-
-import org.apache.jena.graph.*;
-import org.apache.jena.query.Dataset;
-import org.apache.jena.query.DatasetFactory;
-import org.apache.jena.rdf.model.Model;
-import org.junit.Assert;
-import org.junit.Test;
-
-public abstract class TestDatasetCollector extends Assert {
-
-    public abstract DatasetCollector testInstance();
-
-    @Test
-    public void collectionOfEmptyStreamShouldBeEmpty() {
-        final Dataset collected = Stream.<Dataset>empty().collect(testInstance());
-        assertTrue(collected.isEmpty());
-    }
-
-    @Test
-    public void collectionOfStreamOfEmptyDatasetsShouldBeEmpty() {
-        Stream<Dataset> stream = Stream.<Dataset>builder()
-                .add(DatasetFactory.create())
-                .add(DatasetFactory.create())
-                .add(DatasetFactory.create()).build();
-        final Dataset collected = stream.collect(testInstance());
-        assertTrue(collected.isEmpty());
-    }
-
-    @Test(expected=NullPointerException.class)
-    public void noNullDatasetsAllowed() {
-        Stream.<Dataset>builder().add(null).build().collect(testInstance());
-    }
-    
-    @Test
-    public void collectingOneDatasetGivesThatDataset() {
-        Graph graph = parseGraph("(graph (triple <s1> <p1> <o1> ))");
-        Model model = createModelForGraph(graph);
-        Dataset dataset = DatasetFactory.create(model);
-        Node graphName = NodeFactory.createBlankNode();
-        dataset.addNamedModel(graphName.toString(), model);
-        Dataset collection = Stream.<Dataset>builder().add(dataset).build().collect(testInstance());
-        assertDatasetsAreIsomorphicPerGraph(dataset, collection);
-    }
-
-    protected static void assertDatasetsAreIsomorphicPerGraph(Dataset dataset1, Dataset dataset2) {
-        assertGraphsAreIsomorphic(dataset1.getDefaultModel(), dataset2.getDefaultModel());
-        dataset1.listNames().forEachRemaining(graphName ->
-            assertGraphsAreIsomorphic(dataset1.getNamedModel(graphName), dataset2.getNamedModel(graphName)));
-    }
-    
-    protected static void assertGraphsAreIsomorphic(Model graph1, Model graph2) {
-        assertTrue(graph1.isIsomorphicWith(graph2));
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/query/util/TestIntersectionDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/query/util/TestIntersectionDatasetCollector.java b/jena-arq/src/test/java/org/apache/jena/query/util/TestIntersectionDatasetCollector.java
deleted file mode 100644
index 1e30f54..0000000
--- a/jena-arq/src/test/java/org/apache/jena/query/util/TestIntersectionDatasetCollector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.query.util;
-
-import static org.apache.jena.graph.NodeFactory.createBlankNode;
-import static org.apache.jena.rdf.model.ModelFactory.createModelForGraph;
-import static org.apache.jena.sparql.sse.SSE.parseGraph;
-
-import java.util.stream.Stream;
-
-import org.apache.jena.query.Dataset;
-import org.apache.jena.query.DatasetFactory;
-import org.apache.jena.rdf.model.Model;
-import org.junit.Test;
-
-public class TestIntersectionDatasetCollector extends TestDatasetCollector {
-
-    @Override
-    public DatasetCollector testInstance() {
-        return DatasetLib.collectors().intersect();
-    }
-
-    @Test
-    public void testIntersection() {
-        final Model m1 = createModelForGraph(parseGraph("(graph (triple <s1> <p1> <o1> ))"));
-        final Dataset ds1 = DatasetFactory.create(m1);
-        final String graphName1 = createBlankNode().toString();
-        ds1.addNamedModel(graphName1, m1);
-        final Model m2 = createModelForGraph(parseGraph("(graph (triple <s2> <p2> <o2> ))"));
-        final Dataset ds2 = DatasetFactory.create(m2);
-        final String graphName2 = createBlankNode().toString();
-        ds2.addNamedModel(graphName2, m2);
-        final Model m3 = createModelForGraph(parseGraph("(graph (triple <s3> <p3> <o3> ))"));
-        final String graphName3 = createBlankNode().toString();
-        ds1.addNamedModel(graphName3, m3);
-        ds2.addNamedModel(graphName3, m3);
-        
-        final Stream<Dataset> stream = Stream.<Dataset>builder().add(ds1).add(ds2).build();
-        Dataset ds = stream.collect(testInstance());
-        
-        assertTrue(ds.getDefaultModel().isEmpty());
-        assertTrue(ds.getNamedModel(graphName1).isEmpty());
-        assertTrue(ds.getNamedModel(graphName2).isEmpty());
-        assertTrue(m3.isIsomorphicWith(ds.getNamedModel(graphName3)));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/query/util/TestUnionDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/query/util/TestUnionDatasetCollector.java b/jena-arq/src/test/java/org/apache/jena/query/util/TestUnionDatasetCollector.java
deleted file mode 100644
index 2682435..0000000
--- a/jena-arq/src/test/java/org/apache/jena/query/util/TestUnionDatasetCollector.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.query.util;
-
-import static org.apache.jena.rdf.model.ModelFactory.createModelForGraph;
-import static org.apache.jena.sparql.sse.SSE.parseGraph;
-
-import java.util.List;
-import java.util.stream.Stream;
-
-import org.apache.jena.atlas.iterator.Iter;
-import org.apache.jena.graph.Graph;
-import org.apache.jena.graph.NodeFactory;
-import org.apache.jena.query.Dataset;
-import org.apache.jena.query.DatasetFactory;
-import org.apache.jena.rdf.model.Model;
-import org.apache.jena.rdf.model.Statement;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestUnionDatasetCollector extends TestDatasetCollector {
-
-    @Override
-    public DatasetCollector testInstance() {
-        return DatasetLib.collectors().union();
-    }
-
-    @Test
-    public void testUnion() {
-        final Graph g1 = parseGraph("(graph (triple <s1> <p1> <o1> ))");
-        final Model m1 = createModelForGraph(g1);
-        final Dataset dsg1 = DatasetFactory.create(m1);
-        final String graphName1 = NodeFactory.createBlankNode().toString();
-        dsg1.addNamedModel(graphName1, m1);
-        final Graph g2 = parseGraph("(graph (triple <s2> <p2> <o2> ))");
-        final Dataset dsg2 = DatasetFactory.create(createModelForGraph(g2));
-        final Model m2 = createModelForGraph(g2);
-        final String graphName2 = NodeFactory.createBlankNode().toString();
-        dsg2.addNamedModel(graphName2, m2);
-        final Stream<Dataset> stream = Stream.<Dataset>builder().add(dsg1).add(dsg2).build();
-        Dataset dataset = stream.collect(testInstance());
-
-        assertEquals(2, Iter.count(dataset.listNames()));
-        assertTrue(m1.isIsomorphicWith(dataset.getNamedModel(graphName1)));
-        assertTrue(m2.isIsomorphicWith(dataset.getNamedModel(graphName2)));
-        // all statements in any input should be present in the union
-        m1.listStatements().mapWith(dataset.getDefaultModel()::contains).forEachRemaining(Assert::assertTrue);
-        m2.listStatements().mapWith(dataset.getDefaultModel()::contains).forEachRemaining(Assert::assertTrue);
-        // all statements in the union should be present in an input
-        List<Statement> leftovers = dataset.getDefaultModel().listStatements()
-                .filterDrop(m1::contains)
-                .filterDrop(m2::contains).toList();
-        assertTrue(leftovers.isEmpty());
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/sparql/util/TS_Util.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/util/TS_Util.java b/jena-arq/src/test/java/org/apache/jena/sparql/util/TS_Util.java
index 9efae91..969836b 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/util/TS_Util.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/util/TS_Util.java
@@ -19,6 +19,7 @@
 package org.apache.jena.sparql.util;
 
 import org.apache.jena.atlas.lib.TestDateTimeUtils ;
+import org.apache.jena.sparql.util.compose.TS_DatasetCollectors;
 import org.junit.runner.RunWith ;
 import org.junit.runners.Suite ;
 import org.junit.runners.Suite.SuiteClasses ;
@@ -29,7 +30,8 @@ import org.junit.runners.Suite.SuiteClasses ;
     TestList.class ,
     TestDateTimeUtils.class ,
     TestFmtUtils.class,
-    TS_DatasetGraphViews.class
+    TS_DatasetGraphViews.class,
+    TS_DatasetCollectors.class
 })
 public class TS_Util
 { }

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TS_DatasetCollectors.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TS_DatasetCollectors.java b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TS_DatasetCollectors.java
new file mode 100644
index 0000000..2087686
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TS_DatasetCollectors.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.util.compose;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ TestIntersectionDatasetCollector.class, TestUnionDatasetCollector.class })
+public class TS_DatasetCollectors {
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestDatasetCollector.java b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestDatasetCollector.java
new file mode 100644
index 0000000..c0e4578
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestDatasetCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.util.compose;
+
+import static org.apache.jena.rdf.model.ModelFactory.createModelForGraph;
+import static org.apache.jena.sparql.sse.SSE.parseGraph;
+
+import java.util.stream.Stream;
+
+import org.apache.jena.graph.*;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.sparql.util.compose.DatasetCollector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public abstract class TestDatasetCollector extends Assert {
+
+    public abstract DatasetCollector testInstance();
+
+    @Test
+    public void collectionOfEmptyStreamShouldBeEmpty() {
+        final Dataset collected = Stream.<Dataset>empty().collect(testInstance());
+        assertTrue(collected.isEmpty());
+    }
+
+    @Test
+    public void collectionOfStreamOfEmptyDatasetsShouldBeEmpty() {
+        Stream<Dataset> stream = Stream.<Dataset>builder()
+                .add(DatasetFactory.create())
+                .add(DatasetFactory.create())
+                .add(DatasetFactory.create()).build();
+        final Dataset collected = stream.collect(testInstance());
+        assertTrue(collected.isEmpty());
+    }
+
+    @Test(expected=NullPointerException.class)
+    public void noNullDatasetsAllowed() {
+        Stream.<Dataset>builder().add(null).build().collect(testInstance());
+    }
+    
+    @Test
+    public void collectingOneDatasetGivesThatDataset() {
+        Graph graph = parseGraph("(graph (triple <s1> <p1> <o1> ))");
+        Model model = createModelForGraph(graph);
+        Dataset dataset = DatasetFactory.create(model);
+        Node graphName = NodeFactory.createBlankNode();
+        dataset.addNamedModel(graphName.toString(), model);
+        Dataset collection = Stream.<Dataset>builder().add(dataset).build().collect(testInstance());
+        assertDatasetsAreIsomorphicPerGraph(dataset, collection);
+    }
+
+    protected static void assertDatasetsAreIsomorphicPerGraph(Dataset dataset1, Dataset dataset2) {
+        assertGraphsAreIsomorphic(dataset1.getDefaultModel(), dataset2.getDefaultModel());
+        dataset1.listNames().forEachRemaining(graphName ->
+            assertGraphsAreIsomorphic(dataset1.getNamedModel(graphName), dataset2.getNamedModel(graphName)));
+    }
+    
+    protected static void assertGraphsAreIsomorphic(Model graph1, Model graph2) {
+        assertTrue(graph1.isIsomorphicWith(graph2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestIntersectionDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestIntersectionDatasetCollector.java b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestIntersectionDatasetCollector.java
new file mode 100644
index 0000000..ffa48bd
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestIntersectionDatasetCollector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.util.compose;
+
+import static org.apache.jena.graph.NodeFactory.createBlankNode;
+import static org.apache.jena.rdf.model.ModelFactory.createModelForGraph;
+import static org.apache.jena.sparql.sse.SSE.parseGraph;
+
+import java.util.stream.Stream;
+
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.sparql.util.compose.DatasetCollector;
+import org.apache.jena.sparql.util.compose.DatasetLib;
+import org.junit.Test;
+
+public class TestIntersectionDatasetCollector extends TestDatasetCollector {
+
+    @Override
+    public DatasetCollector testInstance() {
+        return DatasetLib.collectors().intersect();
+    }
+
+    @Test
+    public void testIntersection() {
+        final Model m1 = createModelForGraph(parseGraph("(graph (triple <s1> <p1> <o1> ))"));
+        final Dataset ds1 = DatasetFactory.create(m1);
+        final String graphName1 = createBlankNode().toString();
+        ds1.addNamedModel(graphName1, m1);
+        final Model m2 = createModelForGraph(parseGraph("(graph (triple <s2> <p2> <o2> ))"));
+        final Dataset ds2 = DatasetFactory.create(m2);
+        final String graphName2 = createBlankNode().toString();
+        ds2.addNamedModel(graphName2, m2);
+        final Model m3 = createModelForGraph(parseGraph("(graph (triple <s3> <p3> <o3> ))"));
+        final String graphName3 = createBlankNode().toString();
+        ds1.addNamedModel(graphName3, m3);
+        ds2.addNamedModel(graphName3, m3);
+        
+        final Stream<Dataset> stream = Stream.<Dataset>builder().add(ds1).add(ds2).build();
+        Dataset ds = stream.collect(testInstance());
+        
+        assertTrue(ds.getDefaultModel().isEmpty());
+        assertTrue(ds.getNamedModel(graphName1).isEmpty());
+        assertTrue(ds.getNamedModel(graphName2).isEmpty());
+        assertTrue(m3.isIsomorphicWith(ds.getNamedModel(graphName3)));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/09ceffc3/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestUnionDatasetCollector.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestUnionDatasetCollector.java b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestUnionDatasetCollector.java
new file mode 100644
index 0000000..656b124
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/util/compose/TestUnionDatasetCollector.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.util.compose;
+
+import static org.apache.jena.rdf.model.ModelFactory.createModelForGraph;
+import static org.apache.jena.sparql.sse.SSE.parseGraph;
+
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.NodeFactory;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.rdf.model.Statement;
+import org.apache.jena.sparql.util.compose.DatasetCollector;
+import org.apache.jena.sparql.util.compose.DatasetLib;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnionDatasetCollector extends TestDatasetCollector {
+
+    @Override
+    public DatasetCollector testInstance() {
+        return DatasetLib.collectors().union();
+    }
+
+    @Test
+    public void testUnion() {
+        final Graph g1 = parseGraph("(graph (triple <s1> <p1> <o1> ))");
+        final Model m1 = createModelForGraph(g1);
+        final Dataset dsg1 = DatasetFactory.create(m1);
+        final String graphName1 = NodeFactory.createBlankNode().toString();
+        dsg1.addNamedModel(graphName1, m1);
+        final Graph g2 = parseGraph("(graph (triple <s2> <p2> <o2> ))");
+        final Dataset dsg2 = DatasetFactory.create(createModelForGraph(g2));
+        final Model m2 = createModelForGraph(g2);
+        final String graphName2 = NodeFactory.createBlankNode().toString();
+        dsg2.addNamedModel(graphName2, m2);
+        final Stream<Dataset> stream = Stream.<Dataset>builder().add(dsg1).add(dsg2).build();
+        Dataset dataset = stream.collect(testInstance());
+
+        assertEquals(2, Iter.count(dataset.listNames()));
+        assertTrue(m1.isIsomorphicWith(dataset.getNamedModel(graphName1)));
+        assertTrue(m2.isIsomorphicWith(dataset.getNamedModel(graphName2)));
+        // all statements in any input should be present in the union
+        m1.listStatements().mapWith(dataset.getDefaultModel()::contains).forEachRemaining(Assert::assertTrue);
+        m2.listStatements().mapWith(dataset.getDefaultModel()::contains).forEachRemaining(Assert::assertTrue);
+        // all statements in the union should be present in an input
+        List<Statement> leftovers = dataset.getDefaultModel().listStatements()
+                .filterDrop(m1::contains)
+                .filterDrop(m2::contains).toList();
+        assertTrue(leftovers.isEmpty());
+    }
+}