You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/04/07 14:40:35 UTC
[1/4] incubator-rya git commit: RYA-50 Updated Rya PCJs to support
SPARQL queries with OPTIONALs.
Repository: incubator-rya
Updated Branches:
refs/heads/develop 1d92d1991 -> 30ca57ede
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
new file mode 100644
index 0000000..15023c5
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Tests the methods of {@link NaturalJoin}.
+ */
+public class NaturalJoinTest {
+
+ private final ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void newLeftResult_noRightMatches() {
+ IterativeJoin naturalJoin = new NaturalJoin();
+
+ // There is a new left result.
+ MapBindingSet newLeftResult = new MapBindingSet();
+ newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+
+ // There are no right results that join with the left result.
+ Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator();
+
+ // Therefore, the left result is a new join result.
+ Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults);
+ assertFalse( newJoinResultsIt.hasNext() );
+ }
+
+ @Test
+ public void newLeftResult_joinsWithRightResults() {
+ IterativeJoin naturalJoin = new NaturalJoin();
+
+ // There is a new left result.
+ MapBindingSet newLeftResult = new MapBindingSet();
+ newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+ newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+
+ // There are a few right results that join with the left result.
+ MapBindingSet nameAge = new MapBindingSet();
+ nameAge.addBinding("name", vf.createLiteral("Bob"));
+ nameAge.addBinding("age", vf.createLiteral(56));
+
+ MapBindingSet nameHair = new MapBindingSet();
+ nameHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+ Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+ // Therefore, there are a few new join results that mix the two together.
+ Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults);
+
+ Set<BindingSet> newJoinResults = new HashSet<>();
+ while(newJoinResultsIt.hasNext()) {
+ newJoinResults.add( newJoinResultsIt.next() );
+ }
+
+ Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ MapBindingSet nameHeightAge = new MapBindingSet();
+ nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightAge.addBinding("age", vf.createLiteral(56));
+ expected.add(nameHeightAge);
+
+ MapBindingSet nameHeightHair = new MapBindingSet();
+ nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+ expected.add(nameHeightHair);
+
+ assertEquals(expected, newJoinResults);
+ }
+
+ @Test
+ public void newRightResult_noLeftMatches() {
+ IterativeJoin naturalJoin = new NaturalJoin();
+
+ // There are no left results.
+ Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator();
+
+ // There is a new right result.
+ MapBindingSet newRightResult = new MapBindingSet();
+ newRightResult.addBinding("name", vf.createLiteral("Bob"));
+
+ // Therefore, there are no new join results.
+ Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult);
+ assertFalse( newJoinResultsIt.hasNext() );
+ }
+
+ @Test
+ public void newRightResult_joinsWithLeftResults() {
+ IterativeJoin naturalJoin = new NaturalJoin();
+
+ // There are a few left results that join with the new right result.
+ MapBindingSet nameAge = new MapBindingSet();
+ nameAge.addBinding("name", vf.createLiteral("Bob"));
+ nameAge.addBinding("age", vf.createLiteral(56));
+
+ MapBindingSet nameHair = new MapBindingSet();
+ nameHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+ Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+ // There is a new right result.
+ MapBindingSet newRightResult = new MapBindingSet();
+ newRightResult.addBinding("name", vf.createLiteral("Bob"));
+ newRightResult.addBinding("height", vf.createLiteral("5'9\""));
+
+ // Therefore, there are a few new join results that mix the two together.
+ Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult);
+
+ Set<BindingSet> newJoinResults = new HashSet<>();
+ while(newJoinResultsIt.hasNext()) {
+ newJoinResults.add( newJoinResultsIt.next() );
+ }
+
+ Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ MapBindingSet nameHeightAge = new MapBindingSet();
+ nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightAge.addBinding("age", vf.createLiteral(56));
+ expected.add(nameHeightAge);
+
+ MapBindingSet nameHeightHair = new MapBindingSet();
+ nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+ expected.add(nameHeightHair);
+
+ assertEquals(expected, newJoinResults);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index a772b83..0cbfa9a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -62,6 +62,7 @@ import mvm.rya.api.domain.RyaURI;
import mvm.rya.api.resolver.RyaToRdfConversions;
import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.PcjTables;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
@@ -75,6 +76,8 @@ import mvm.rya.rdftriplestore.RyaSailRepository;
public class FluoAndHistoricPcjsDemo implements Demo {
private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class);
+ private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
// Employees
private static final RyaURI alice = new RyaURI("http://Alice");
private static final RyaURI bob = new RyaURI("http://Bob");
@@ -350,11 +353,11 @@ public class FluoAndHistoricPcjsDemo implements Demo {
for(final Entry<Key, Value> entry : scanner) {
final byte[] serializedResult = entry.getKey().getRow().getBytes();
- final BindingSet result = AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray());
+ final BindingSet result = converter.convert(serializedResult, varOrder);
fetchedResults.put(varOrder.toString(), result);
}
}
- } catch(PcjException | TableNotFoundException | RyaTypeResolverException e) {
+ } catch(PcjException | TableNotFoundException | BindingSetConversionException e) {
throw new DemoExecutionException("Couldn't fetch the binding sets that were exported to the PCJ table, so the demo can not continue. Exiting.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index c9af9aa..cfb5248 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
@@ -78,6 +77,8 @@ import mvm.rya.api.domain.RyaType;
import mvm.rya.api.domain.RyaURI;
import mvm.rya.api.resolver.RyaToRdfConversions;
import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
import mvm.rya.rdftriplestore.RdfCloudTripleStore;
import mvm.rya.rdftriplestore.RyaSailRepository;
@@ -260,17 +261,19 @@ public abstract class ITBase {
// Fetch the query's variable order.
final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
- final String[] varOrder = queryMetadata.getVariableOrder().toArray();
+ final VariableOrder varOrder = queryMetadata.getVariableOrder();
// Fetch the Binding Sets for the query.
final ScannerConfiguration scanConfig = new ScannerConfiguration();
scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
+ BindingSetStringConverter converter = new BindingSetStringConverter();
+
final RowIterator rowIter = snapshot.get(scanConfig);
while(rowIter.hasNext()) {
final Entry<Bytes, ColumnIterator> row = rowIter.next();
final String bindingSetString = row.getValue().next().getValue().toString();
- final BindingSet bindingSet = FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+ final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
bindingSets.add(bindingSet);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index 231c8f1..601dfd4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -21,6 +21,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static org.junit.Assert.assertEquals;
import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
import org.junit.Test;
import org.openrdf.query.MalformedQueryException;
@@ -99,6 +100,7 @@ public class FluoQueryMetadataDAOIT extends ITBase {
// Create the object that will be serialized.
final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId");
builder.setVariableOrder(new VariableOrder("g;y;s"));
+ builder.setJoinType(JoinType.NATURAL_JOIN);
builder.setParentNodeId("parentNodeId");
builder.setLeftChildNodeId("leftChildNodeId");
builder.setRightChildNodeId("rightChildNodeId");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 32b7d84..46db8cd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -45,6 +45,50 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
public class QueryIT extends ITBase {
+ @Test
+ public void optionalStatements() throws Exception {
+ // A query that has optional statement patterns. This query is looking for all
+ // people who have Law degrees and any BAR exams they have passed (though they
+ // do not have to have passed any).
+ final String sparql =
+ "SELECT ?person ?exam " +
+ "WHERE {" +
+ "?person <http://hasDegreeIn> <http://Law> . " +
+ "OPTIONAL {?person <http://passedExam> ?exam } . " +
+ "}";
+
+ // Triples that will be streamed into Fluo after the PCJ has been created.
+ final Set<RyaStatement> streamedTriples = Sets.newHashSet(
+ makeRyaStatement("http://Alice", "http://hasDegreeIn", "http://Computer Science"),
+ makeRyaStatement("http://Alice", "http://passedExam", "http://Certified Ethical Hacker"),
+ makeRyaStatement("http://Bob", "http://hasDegreeIn", "http://Law"),
+ makeRyaStatement("http://Bob", "http://passedExam", "http://MBE"),
+ makeRyaStatement("http://Bob", "http://passedExam", "http://BAR-Kansas"),
+ makeRyaStatement("http://Charlie", "http://hasDegreeIn", "http://Law"));
+
+ // The expected results of the SPARQL query once the PCJ has been computed.
+ final Set<BindingSet> expected = new HashSet<>();
+ expected.add( makeBindingSet(
+ new BindingImpl("person", new URIImpl("http://Bob")),
+ new BindingImpl("exam", new URIImpl("http://MBE"))));
+ expected.add( makeBindingSet(
+ new BindingImpl("person", new URIImpl("http://Bob")),
+ new BindingImpl("exam", new URIImpl("http://BAR-Kansas"))));
+ expected.add( makeBindingSet(
+ new BindingImpl("person", new URIImpl("http://Charlie"))));
+
+ // Create the PCJ in Fluo.
+ new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+ // Stream the data into Fluo.
+ new InsertTriples().insert(fluoClient, streamedTriples);
+
+ // Verify the end results of the query match the expected results.
+ fluo.waitForObservers();
+ final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+ assertEquals(expected, results);
+ }
+
/**
* Tests when there are a bunch of variables across a bunch of joins.
*/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index ae728cd..ee3fffd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -52,6 +52,7 @@ import io.fluo.api.data.Bytes;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.PcjTables;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
@@ -64,6 +65,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
public class RyaExportIT extends ITBase {
+ private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
/**
* Configure the export observer to use the Mini Accumulo instance as the
* export destination for new PCJ results.
@@ -163,7 +166,7 @@ public class RyaExportIT extends ITBase {
* multimap stores a set of deserialized binding sets that were in the PCJ
* table for every variable order that is found in the PCJ metadata.
*/
- private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, RyaTypeResolverException {
+ private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException {
final Multimap<String, BindingSet> fetchedResults = HashMultimap.create();
// Get the variable orders the data was written to.
@@ -177,7 +180,7 @@ public class RyaExportIT extends ITBase {
for(final Entry<Key, Value> entry : scanner) {
final byte[] serializedResult = entry.getKey().getRow().getBytes();
- final BindingSet result = AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray());
+ final BindingSet result = converter.convert(serializedResult, varOrder);
fetchedResults.put(varOrder.toString(), result);
}
}
[3/4] incubator-rya git commit: RYA-50 Updated Rya PCJs to support
SPARQL queries with OPTIONALs.
Posted by pu...@apache.org.
RYA-50 Updated Rya PCJs to support SPARQL queries with OPTIONALs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/a24515b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/a24515b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/a24515b0
Branch: refs/heads/develop
Commit: a24515b0d1c7c3e459d68a867160850aa7d451c1
Parents: c378f64
Author: Kevin Chilton <ke...@localhost.localdomain>
Authored: Tue Mar 22 14:49:54 2016 -0400
Committer: Kevin Chilton <ke...@localhost.localdomain>
Committed: Mon Apr 4 18:40:09 2016 -0400
----------------------------------------------------------------------
.../accumulo/precompQuery/AccumuloPcjQuery.java | 40 +-
.../indexing/external/PrecompJoinOptimizer.java | 20 +
.../external/tupleSet/AccumuloIndexSet.java | 4 +-
.../tupleSet/AccumuloPcjSerializer.java | 199 ++++++----
.../external/tupleSet/BindingSetConverter.java | 108 +++++
.../tupleSet/BindingSetStringConverter.java | 111 +++---
.../indexing/external/tupleSet/PcjTables.java | 26 +-
.../indexing/external/PCJOptionalTestIT.java | 324 +++++++++++++++
.../external/PcjIntegrationTestingUtil.java | 9 +-
.../external/PrecompJoinOptimizerTest.java | 2 +
.../tupleSet/AccumuloPcjSerialzerTest.java | 154 +++++---
.../tupleSet/BindingSetStringConverterTest.java | 151 +++++--
.../tupleSet/PcjTablesIntegrationTests.java | 13 +-
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 15 +-
.../indexing/pcj/fluo/app/BindingSetRow.java | 23 +-
.../pcj/fluo/app/FilterResultUpdater.java | 10 +-
.../pcj/fluo/app/FluoStringConverter.java | 152 -------
.../pcj/fluo/app/JoinResultUpdater.java | 395 +++++++++++++------
.../pcj/fluo/app/QueryResultUpdater.java | 10 +-
.../fluo/app/observers/BindingSetUpdater.java | 7 +-
.../pcj/fluo/app/observers/FilterObserver.java | 10 +-
.../pcj/fluo/app/observers/JoinObserver.java | 10 +-
.../fluo/app/observers/QueryResultObserver.java | 9 +-
.../app/observers/StatementPatternObserver.java | 10 +-
.../pcj/fluo/app/query/FluoQueryColumns.java | 2 +
.../fluo/app/query/FluoQueryMetadataDAO.java | 7 +
.../pcj/fluo/app/query/JoinMetadata.java | 35 ++
.../fluo/app/query/SparqlFluoQueryBuilder.java | 64 +--
.../pcj/fluo/app/FluoStringConverterTest.java | 74 ----
.../pcj/fluo/app/LeftOuterJoinTest.java | 174 ++++++++
.../indexing/pcj/fluo/app/NaturalJoinTest.java | 166 ++++++++
.../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 7 +-
.../apache/rya/indexing/pcj/fluo/ITBase.java | 9 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 2 +
.../indexing/pcj/fluo/integration/QueryIT.java | 44 +++
.../pcj/fluo/integration/RyaExportIT.java | 7 +-
36 files changed, 1730 insertions(+), 673 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
index 08c0f78..dd1e9c9 100644
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
@@ -37,7 +37,9 @@ import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.PcjQuery;
import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
@@ -68,9 +70,11 @@ import com.google.common.collect.Sets;
*
*/
public class AccumuloPcjQuery implements PcjQuery {
+ private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
private final Connector accCon;
private final String tableName;
-
+
public AccumuloPcjQuery(Connector accCon, String tableName) {
this.accCon = accCon;
this.tableName = tableName;
@@ -119,10 +123,9 @@ public class AccumuloPcjQuery implements PcjQuery {
}
}
try {
- rangePrefix = AccumuloPcjSerializer.serialize(rangeBs,
- commonVars.toArray(new String[commonVars.size()]));
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
+ rangePrefix = converter.convert(rangeBs, new VariableOrder(commonVars));
+ } catch (final BindingSetConversionException e) {
+ throw new QueryEvaluationException(e);
}
final Range r = Range.prefix(new Text(rangePrefix));
map.put(r, bSet);
@@ -149,10 +152,9 @@ public class AccumuloPcjQuery implements PcjQuery {
}
}
try {
- rangePrefix = AccumuloPcjSerializer.serialize(rangeBs,
- commonVars.toArray(new String[commonVars.size()]));
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
+ rangePrefix = converter.convert(rangeBs, new VariableOrder(commonVars));
+ } catch (final BindingSetConversionException e) {
+ throw new QueryEvaluationException(e);
}
final Range r = Range.prefix(new Text(rangePrefix));
ranges.add(r);
@@ -224,7 +226,7 @@ public class AccumuloPcjQuery implements PcjQuery {
BindingSet bs;
try {
bs = getBindingSetWithoutConstants(k, tableVarMap);
- } catch (final RyaTypeResolverException e) {
+ } catch (final BindingSetConversionException e) {
throw new QueryEvaluationException(e);
}
currentSolutionBs = new QueryBindingSet();
@@ -239,7 +241,7 @@ public class AccumuloPcjQuery implements PcjQuery {
try {
rangePrefix = getPrefixByte(bs, constValMap,
prefixVars);
- } catch (final RyaTypeResolverException e) {
+ } catch (final BindingSetConversionException e) {
throw new QueryEvaluationException(e);
}
final Range r = Range.prefix(new Text(rangePrefix));
@@ -312,7 +314,7 @@ public class AccumuloPcjQuery implements PcjQuery {
*/
private static byte[] getPrefixByte(BindingSet bs,
Map<String, org.openrdf.model.Value> valMap, List<String> prefixVars)
- throws RyaTypeResolverException {
+ throws BindingSetConversionException {
final QueryBindingSet bSet = new QueryBindingSet();
for (final String var : prefixVars) {
if (var.startsWith(ExternalTupleSet.CONST_PREFIX)) {
@@ -322,8 +324,8 @@ public class AccumuloPcjQuery implements PcjQuery {
bSet.addBinding(var, bs.getBinding(var).getValue());
}
}
- return AccumuloPcjSerializer.serialize(bSet,
- prefixVars.toArray(new String[prefixVars.size()]));
+
+ return converter.convert(bSet, new VariableOrder(prefixVars));
}
/**
@@ -331,15 +333,17 @@ public class AccumuloPcjQuery implements PcjQuery {
* @param key - Accumulo key obtained from scan
* @param tableVarMap - map that associated query variables and table variables
* @return - BindingSet without values associated with constant constraints
- * @throws RyaTypeResolverException
+ * @throws BindingSetConversionException
*/
private static BindingSet getBindingSetWithoutConstants(Key key,
- Map<String, String> tableVarMap) throws RyaTypeResolverException {
+ Map<String, String> tableVarMap) throws BindingSetConversionException {
final byte[] row = key.getRow().getBytes();
final String[] varOrder = key.getColumnFamily().toString()
.split(ExternalTupleSet.VAR_ORDER_DELIM);
- final QueryBindingSet temp = new QueryBindingSet(
- AccumuloPcjSerializer.deSerialize(row, varOrder));
+
+ BindingSet bindingSet = converter.convert(row, new VariableOrder(varOrder));
+ final QueryBindingSet temp = new QueryBindingSet(bindingSet);
+
final QueryBindingSet bs = new QueryBindingSet();
for (final String var : temp.getBindingNames()) {
if (!tableVarMap.get(var).startsWith(ExternalTupleSet.CONST_PREFIX)) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
index eb0f042..f8c6c77 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
@@ -20,6 +20,7 @@ package mvm.rya.indexing.external;
*/
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -241,6 +242,25 @@ public class PrecompJoinOptimizer implements QueryOptimizer, Configurable {
this.segmentFilters = segmentFilters;
}
+
+ // this handles the case when the optional/LeftJoin is the first node
+ // below the Projection node. Checks to see if any of the ExternalTupleSets
+ // match the LeftJoin exactly.
+ //TODO ExteranlTupleSet won't match this LeftJoin if query contains Filters and order of
+ //filters does not match order of filters in ExternalTupleSet after filters are pushed down
+ @Override
+ public void meet(LeftJoin node) {
+
+ updateFilters(segmentFilters, true);
+
+ List<TupleExpr> joinArgs = matchExternalTupleSets(Arrays.asList((TupleExpr) node), tupList);
+ if(joinArgs.size() == 1 && joinArgs.get(0) instanceof ExternalTupleSet) {
+ node.replaceWith(joinArgs.get(0));
+ }
+ return;
+ }
+
+
@Override
public void meet(Join node) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
index 9c1bc7b..6011a52 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
@@ -104,7 +104,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements ExternalBatchi
@Override
public String getSignature() {
- return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(this.getTupleExpr().getAssuredBindingNames());
+ return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(this.getTupleExpr().getBindingNames());
}
/**
@@ -130,7 +130,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements ExternalBatchi
throw new MalformedQueryException("SPARQL query '" + sparql + "' does not contain a Projection.");
}
setProjectionExpr(projection.get());
-
+
Set<VariableOrder> orders = null;
try {
orders = pcj.getPcjMetadata(accCon, tablename).getVarOrders();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java
index 452ea61..bb87d7f 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java
@@ -19,104 +19,169 @@ package mvm.rya.indexing.external.tupleSet;
* under the License.
*/
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
-import java.util.ArrayList;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
import mvm.rya.api.domain.RyaType;
import mvm.rya.api.resolver.RdfToRyaConversions;
import mvm.rya.api.resolver.RyaContext;
import mvm.rya.api.resolver.RyaToRdfConversions;
import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
import org.openrdf.model.Value;
+import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-import com.google.common.base.Preconditions;
import com.google.common.primitives.Bytes;
/**
- * AccumuloPcjSerializer provides two methods, serialize and deserialize, which
- * are used for writing BindingSets to PCJ tables and reading serialized byte
- * representations of BindingSets from PCJ tables.
- *
+ * Converts {@link BindingSet}s to byte[]s and back again. The bytes do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
*/
-public class AccumuloPcjSerializer {
-
- /**
- *
- * @param bs {@link BindingSet} to be serialized
- * @param varOrder order in which binding values should be written to byte array
- * @return byte array containing serialized values written in order indicated by varOrder
- * @throws RyaTypeResolverException
- */
- public static byte[] serialize(BindingSet bs, String[] varOrder) throws RyaTypeResolverException {
- byte[] byteArray = null;
- int i = 0;
- Preconditions.checkNotNull(bs);
- Preconditions.checkNotNull(varOrder);
- Preconditions.checkArgument(bs.size() == varOrder.length);
- for(final String varName: varOrder) {
- final RyaType rt = RdfToRyaConversions.convertValue(bs.getBinding(varName).getValue());
- final byte[][] serializedVal = RyaContext.getInstance().serializeType(rt);
- if(i == 0) {
- byteArray = Bytes.concat(serializedVal[0], serializedVal[1], DELIM_BYTES);
- } else {
- byteArray = Bytes.concat(byteArray, serializedVal[0], serializedVal[1], DELIM_BYTES);
+@ParametersAreNonnullByDefault
+public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
+
+ @Override
+ public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException {
+ checkNotNull(bindingSet);
+ checkNotNull(varOrder);
+ checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+
+ // A list that holds all of the byte segments that will be concatenated at the end.
+ // This minimizes byte[] construction.
+ final List<byte[]> byteSegments = new LinkedList<>();
+
+ try {
+ for(final String varName: varOrder) {
+ // Only write information for a variable name if the binding set contains it.
+ if(bindingSet.hasBinding(varName)) {
+ final RyaType rt = RdfToRyaConversions.convertValue(bindingSet.getBinding(varName).getValue());
+ final byte[][] serializedVal = RyaContext.getInstance().serializeType(rt);
+ byteSegments.add(serializedVal[0]);
+ byteSegments.add(serializedVal[1]);
+ }
+
+ // But always write the value delimiter. If a value is missing, you'll see two delimiters next to each-other.
+ byteSegments.add(DELIM_BYTES);
}
- i++;
- }
+
+ return concat(byteSegments);
+ } catch (RyaTypeResolverException e) {
+ throw new BindingSetConversionException("Could not convert the BindingSet into a byte[].", e);
+ }
+ }
- return byteArray;
- }
+ @Override
+ public BindingSet convert(byte[] bindingSetBytes, VariableOrder varOrder) throws BindingSetConversionException {
+ checkNotNull(bindingSetBytes);
+ checkNotNull(varOrder);
+
+ try {
+ // Slice the row into bindings.
+ List<byte[]> values = splitlByDelimByte(bindingSetBytes);
+ String[] varOrderStrings = varOrder.toArray();
+ checkArgument(values.size() == varOrderStrings.length);
+
+ // Convert the Binding bytes into a BindingSet.
+ final QueryBindingSet bindingSet = new QueryBindingSet();
+
+ for(int i = 0; i < varOrderStrings.length; i++) {
+ byte[] valueBytes = values.get(i);
+ if(valueBytes.length > 0) {
+ String name = varOrderStrings[i];
+ Value value = deserializeValue(valueBytes);
+ bindingSet.addBinding(name, value);
+ }
+ }
+
+ return bindingSet;
+ } catch (RyaTypeResolverException e) {
+ throw new BindingSetConversionException("Could not convert the byte[] into a BindingSet.", e);
+ }
+ }
+
+ /**
+ * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
+ * are a subset of the variables names in {@link VariableOrder}.
+ *
+ * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
+ * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
+ * @throws IllegalArgumentException Indicates the names of the bindings are
+ * not a subset of the variable order.
+ */
+ private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
+ checkNotNull(bindingSet);
+ checkNotNull(varOrder);
- /**
- *
- * @param row byte rowId (read from Accumulo {@link Key})
- * @param varOrder indicates the order in which values are written in row
- * @return {@link BindingSet} formed from serialized values in row and variables in varOrder
- * @throws RyaTypeResolverException
- */
- public static BindingSet deSerialize(byte[] row, String[] varOrder) throws RyaTypeResolverException {
- Preconditions.checkNotNull(row);
- Preconditions.checkNotNull(varOrder);
- final int lastIndex = Bytes.lastIndexOf(row, DELIM_BYTE);
- Preconditions.checkArgument(lastIndex >= 0);
- final List<byte[]> byteList = getByteValues(Arrays.copyOf(row, lastIndex), new ArrayList<byte[]>());
- final QueryBindingSet bs = new QueryBindingSet();
- Preconditions.checkArgument(byteList.size() == varOrder.length);
- for(int i = 0; i < byteList.size(); i++) {
- bs.addBinding(varOrder[i], getValue(byteList.get(i)));
- }
- return bs;
- }
+ Set<String> bindingNames = bindingSet.getBindingNames();
+ List<String> varNames = varOrder.getVariableOrders();
+ checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
+ }
+
+ private static final byte[] concat(Iterable<byte[]> byteSegments) {
+ checkNotNull(byteSegments);
- private static List<byte[]> getByteValues(byte[] row, List<byte[]> byteList) {
- final int firstIndex = Bytes.indexOf(row, DELIM_BYTE);
- if(firstIndex < 0) {
- byteList.add(row);
- return byteList;
- } else {
- byteList.add(Arrays.copyOf(row, firstIndex));
- getByteValues(Arrays.copyOfRange(row, firstIndex+1, row.length), byteList);
- }
+ // Allocate a byte array that is able to hold the segments.
+ int length = 0;
+ for(byte[] byteSegment : byteSegments) {
+ length += byteSegment.length;
+ }
+ byte[] result = new byte[length];
+
+ // Copy the segments to the byte array and return it.
+ ByteBuffer buff = ByteBuffer.wrap(result);
+ for(byte[] byteSegment : byteSegments) {
+ buff.put(byteSegment);
+ }
+ return result;
+ }
+
+ private static List<byte[]> splitlByDelimByte(byte[] bindingSetBytes) {
+ checkNotNull(bindingSetBytes);
- return byteList;
+ List<byte[]> values = new LinkedList<>();
+
+ ByteBuffer buff = ByteBuffer.wrap(bindingSetBytes);
+ int start = 0;
+ while(buff.hasRemaining()) {
+ if(buff.get() == DELIM_BYTE) {
+ // Mark the position of the value delimiter.
+ int end = buff.position();
+
+ // Move to the start of the value and copy the bytes into an array.
+ byte[] valueBytes = new byte[(end - start) -1];
+ buff.position(start);
+ buff.get(valueBytes);
+ buff.position(end);
+ values.add(valueBytes);
+
+ // Move the start of the next value to the end of this one.
+ start = end;
+ }
+ }
+
+ return values;
}
- private static Value getValue(byte[] byteVal) throws RyaTypeResolverException {
-
+ private static Value deserializeValue(byte[] byteVal) throws RyaTypeResolverException {
final int typeIndex = Bytes.indexOf(byteVal, TYPE_DELIM_BYTE);
- Preconditions.checkArgument(typeIndex >= 0);
+ checkArgument(typeIndex >= 0);
final byte[] data = Arrays.copyOf(byteVal, typeIndex);
final byte[] type = Arrays.copyOfRange(byteVal, typeIndex, byteVal.length);
final RyaType rt = RyaContext.getInstance().deserialize(Bytes.concat(data,type));
return RyaToRdfConversions.convertValue(rt);
-
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetConverter.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetConverter.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetConverter.java
new file mode 100644
index 0000000..9c10dfa
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetConverter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.tupleSet;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+/**
+ * Converts {@link BindingSet}s into other representations. This library is
+ * intended to convert between BindingSet and whatever format it is being
+ * stored as. These formats are often optimized for query evaluation.
+ *
+ * @param <T> Defines the type of model {@link BindingSet}s will be converted into/from.
+ */
+@ParametersAreNonnullByDefault
+public interface BindingSetConverter<T> {
+
+ /**
+ * Converts a {@link BindingSet} into the target model. The target model
+ * may not include every {@link Binding} that was in the original BindingSet,
+ * it may not include the binding names, and it may order the binding values.
+ * All of this information is specified using a {@link VariableOrder}.
+ * </p>
+ * Because the resulting model may not include the binding names from the
+ * original object, you must hold onto that information if you want to
+ * convert the resulting model back into a BindingSet later. Because the
+ * resulting model may only contain a subset of the original BindingSet's
+ * bindings, some information may be lost, so you may not be able to convert
+ * the target model back into the original BindingSet.
+ *
+ * @param bindingSet - The BindingSet that will be converted. (not null)
+ * @param varOrder - Which bindings and in what order they will appear in the
+ * resulting model. (not null)
+ * @return The BindingSet formatted as the target model.
+ * @throws BindingSetConversionException The BindingSet was unable to be
+ * converted into the target model. This will happen if the BindingSet has
+ * a binding whose name is not in the VariableOrder or if one of the values
+ * could not be converted into the target model.
+ */
+ public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
+
+ /**
+ * Converts the target model representation of a {@link BindingSet} as is
+ * created by {@link #convert(BindingSet, VariableOrder)} back into a
+ * BindingSet.
+ * </p>
+ * You must provide the Binding names and the order they were written to
+ * by using a {@link VariableOrder}.
+ * </p>
+ * If there is no value for one of the variable order names, then that binding
+ * will be missing from the resulting BindingSet.
+ *
+ * @param bindingSet - The BindingSet formatted as the target model that will
+ * be converted. (not null)
+ * @param varOrder - The VariableOrder that was used to create the target model. (not null)
+ * @return The {@link BindingSet} representation of the target model.
+ * @throws BindingSetConversionException The target model was unable to be
+ * converted back into a BindingSet.
+ */
+ public BindingSet convert(T bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
+
+ /**
+ * One of the conversion methods of {@link BindingSetConverter} was unable to
+ * to convert the {@link BindingSet} to/from the converted model.
+ */
+ public static class BindingSetConversionException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link BindingSetConversionException}.
+ *
+ * @param message - Describes why this exception was thrown.
+ */
+ public BindingSetConversionException(String message) {
+ super(message);
+ }
+
+ /**
+ * BindingSetConversionException
+ *
+ * @param message - Describes why this exception was thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public BindingSetConversionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverter.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverter.java
index d28186d..6d69d5b 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverter.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverter.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package mvm.rya.indexing.external.tupleSet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import java.util.Iterator;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import javax.annotation.ParametersAreNonnullByDefault;
+
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
@@ -36,89 +37,68 @@ import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import com.google.common.base.Joiner;
+
import mvm.rya.api.domain.RyaType;
import mvm.rya.api.resolver.RdfToRyaConversions;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
- * Converts {@link BindingSet}s to Strings and back again.
+ * Converts {@link BindingSet}s to Strings and back again. The Strings do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
*/
-public class BindingSetStringConverter {
+@ParametersAreNonnullByDefault
+public class BindingSetStringConverter implements BindingSetConverter<String> {
- private static final String BINDING_DELIM = ":::";
- private static final String TYPE_DELIM = "<<~>>";
+ public static final String BINDING_DELIM = ":::";
+ public static final String TYPE_DELIM = "<<~>>";
+ public static final String NULL_VALUE_STRING = Character.toString( '\0' );
private static final ValueFactory valueFactory = new ValueFactoryImpl();
- /**
- * Converts a {@link BindingSet} to a String. You must provide the order the
- * {@link Binding}s will be written to.
- * </p>
- * The resulting string does not include the binding names from the original
- * object, so that must be kept with the resulting String if you want to
- * convert it back to a BindingSet later.
- * </p>
- *
- * @param bindingSet - The BindingSet that will be converted. (not null)
- * @param varOrder - The order the bindings will appear in the resulting String. (not null)
- * @return A {@code String} version of {@code bindingSet} whose binding are
- * ordered based on {@code varOrder}.
- */
- public static String toString(BindingSet bindingSet, VariableOrder varOrder) {
- checkSameVariables(bindingSet, varOrder);
-
- final StringBuilder bindingSetString = new StringBuilder();
-
- Iterator<String> it = varOrder.iterator();
- while(it.hasNext()) {
- // Add a value to the binding set.
- String varName = it.next();
- final Value value = bindingSet.getBinding(varName).getValue();
- final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
- bindingSetString.append( ryaValue.getData() ).append(TYPE_DELIM).append( ryaValue.getDataType() );
-
- // If there are more values to add, include a delimiter between them.
- if(it.hasNext()) {
- bindingSetString.append(BINDING_DELIM);
+ @Override
+ public String convert(BindingSet bindingSet, VariableOrder varOrder) {
+ checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+
+ // Convert each Binding to a String.
+ List<String> bindingStrings = new ArrayList<>();
+ for(String varName : varOrder) {
+ if(bindingSet.hasBinding(varName)) {
+ // Add a value to the binding set.
+ final Value value = bindingSet.getBinding(varName).getValue();
+ final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
+ String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
+ bindingStrings.add(bindingString);
+ } else {
+ // Add a null value to the binding set.
+ bindingStrings.add(NULL_VALUE_STRING);
}
}
-
- return bindingSetString.toString();
+
+ // Join the bindings using the binding delim.
+ return Joiner.on(BINDING_DELIM).join(bindingStrings);
}
/**
* Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
- * match the variable names in the {@link VariableOrder}.
- *
+ * are a subset of the variables names in {@link VariableOrder}.
+ *
* @param bindingSet - The binding set whose Bindings will be inspected. (not null)
- * @param varOrder - The names of the bindings that must appear in the BindingSet. (not null)
- * @throws IllegalArgumentException Indicates the number of bindings did not match
- * the number of variables or that the binding names did not match the names
- * of the variables.
+ * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
+ * @throws IllegalArgumentException Indicates the names of the bindings are
+ * not a subset of the variable order.
*/
- private static void checkSameVariables(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
+ private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
checkNotNull(bindingSet);
checkNotNull(varOrder);
Set<String> bindingNames = bindingSet.getBindingNames();
- List<String> varOrderList = varOrder.getVariableOrders();
- checkArgument(bindingNames.size() == varOrderList.size(), "The number of Bindings must match the length of the VariableOrder.");
- checkArgument(bindingNames.containsAll(varOrderList), "The names of the Bindings must match the variable names in VariableOrder.");
+ List<String> varNames = varOrder.getVariableOrders();
+ checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
}
- /**
- * Converts the String representation of a {@link BindingSet} as is created
- * by {@link #toString(BindingSet, VariableOrder)} back into a BindingSet.
- * <p>
- * You must provide the Binding names in the order they were written to.
- * </p>
- *
- * @param bindingSetString - The binding set values as a String. (not null)
- * @param varOrder - The order the bindings appear in the String version of
- * the BindingSet. (not null)
- * @return A {@link BindingSet} representation of the String.
- */
- public static BindingSet fromString(final String bindingSetString, final VariableOrder varOrder) {
+ @Override
+ public BindingSet convert(String bindingSetString, VariableOrder varOrder) {
checkNotNull(bindingSetString);
checkNotNull(varOrder);
@@ -128,9 +108,12 @@ public class BindingSetStringConverter {
final QueryBindingSet bindingSet = new QueryBindingSet();
for(int i = 0; i < bindingStrings.length; i++) {
- final String name = varOrrderArr[i];
- final Value value = toValue(bindingStrings[i]);
- bindingSet.addBinding(name, value);
+ String bindingString = bindingStrings[i];
+ if(!NULL_VALUE_STRING.equals(bindingString)) {
+ final String name = varOrrderArr[i];
+ final Value value = toValue(bindingStrings[i]);
+ bindingSet.addBinding(name, value);
+ }
}
return bindingSet;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java
index f87df51..fc940f6 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java
@@ -8,9 +8,9 @@ package mvm.rya.indexing.external.tupleSet;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,6 @@ import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.Immutable;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
@@ -76,6 +74,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
+
/**
* Functions that create and maintain the PCJ tables that are used by Rya.
*/
@@ -137,6 +138,16 @@ public class PcjTables {
}
/**
+ * Constructs an instance of {@link VariableOrdeR{.
+ *
+ * @param varOrder - An ordered collection of Binding Set variables. (not null)
+ */
+ public VariableOrder(Collection<String> varOrder) {
+ checkNotNull(varOrder);
+ this.variableOrder = ImmutableList.copyOf(varOrder);
+ }
+
+ /**
* Constructs an instance of {@link VariableOrder}.
*
* @param varOrderString - The String representation of a VariableOrder. (not null)
@@ -621,17 +632,18 @@ public class PcjTables {
checkNotNull(result);
Set<Mutation> mutations = new HashSet<>();
-
+ AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
for(final VariableOrder varOrder : varOrders) {
try {
// Serialize the result to the variable order.
- byte[] serializedResult = AccumuloPcjSerializer.serialize(result, varOrder.toArray());
+ byte[] serializedResult = converter.convert(result, varOrder);
// Row ID = binding set values, Column Family = variable order of the binding set.
Mutation addResult = new Mutation(serializedResult);
addResult.put(varOrder.toString(), "", "");
mutations.add(addResult);
- } catch(RyaTypeResolverException e) {
+ } catch(BindingSetConversionException e) {
throw new PcjException("Could not serialize a result.", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java
new file mode 100644
index 0000000..e3ef741
--- /dev/null
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.external.PrecompJoinOptimizerIntegrationTest.CountingResultHandler;
+import mvm.rya.indexing.external.PrecompJoinOptimizerTest.NodeCollector;
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjVarOrderFactory;
+import mvm.rya.indexing.external.tupleSet.SimpleExternalTupleSet;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.SailException;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Optional;
+
+public class PCJOptionalTestIT {
+
+
+ private SailRepositoryConnection conn, pcjConn;
+ private SailRepository repo, pcjRepo;
+ private Connector accCon;
+ String tablePrefix = "table_";
+ URI sub, sub2, obj, obj2, subclass, subclass2, talksTo, sub3, subclass3;
+
+ @Before
+ public void init() throws RepositoryException,
+ TupleQueryResultHandlerException, QueryEvaluationException,
+ MalformedQueryException, AccumuloException,
+ AccumuloSecurityException, TableExistsException, RyaDAOException,
+ TableNotFoundException, InferenceEngineException {
+
+ repo = PcjIntegrationTestingUtil.getNonPcjRepo(tablePrefix, "instance");
+ conn = repo.getConnection();
+
+ pcjRepo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
+ pcjConn = pcjRepo.getConnection();
+
+ sub = new URIImpl("uri:entity");
+ subclass = new URIImpl("uri:class");
+ obj = new URIImpl("uri:obj");
+ talksTo = new URIImpl("uri:talksTo");
+
+ conn.add(sub, RDF.TYPE, subclass);
+ conn.add(sub, RDFS.LABEL, new LiteralImpl("label"));
+ conn.add(sub, talksTo, obj);
+
+ sub2 = new URIImpl("uri:entity2");
+ subclass2 = new URIImpl("uri:class2");
+ obj2 = new URIImpl("uri:obj2");
+ sub3 = new URIImpl("uri:entity3");
+ subclass3 = new URIImpl("uri:class3");
+
+
+ conn.add(sub2, RDF.TYPE, subclass2);
+ conn.add(sub2, RDFS.LABEL, new LiteralImpl("label2"));
+ conn.add(sub2, talksTo, obj2);
+ conn.add(sub3, RDF.TYPE, subclass3);
+ conn.add(sub3, RDFS.LABEL, new LiteralImpl("label3"));
+
+
+ accCon = new MockInstance("instance").getConnector("root",
+ new PasswordToken(""));
+
+ }
+
+ @After
+ public void close() throws RepositoryException, AccumuloException,
+ AccumuloSecurityException, TableNotFoundException {
+
+ PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
+ PcjIntegrationTestingUtil.closeAndShutdown(pcjConn, pcjRepo);
+ PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
+ PcjIntegrationTestingUtil.deleteIndexTables(accCon, 2, tablePrefix);
+
+ }
+
+
+ @Test
+ public void testEvaluateSingeIndexExactMatch()
+ throws TupleQueryResultHandlerException, QueryEvaluationException,
+ MalformedQueryException, RepositoryException, AccumuloException,
+ AccumuloSecurityException, TableExistsException, RyaDAOException,
+ SailException, TableNotFoundException, PcjException, InferenceEngineException {
+
+ String indexSparqlString = ""//
+ + "SELECT ?e ?c ?l ?o" //
+ + "{" //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
+ + " OPTIONAL{?e <uri:talksTo> ?o } . "//
+ + "}";//
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
+ + "INDEX_1", indexSparqlString, new String[] { "e", "c", "l", "o" },
+ Optional.<PcjVarOrderFactory> absent());
+ String queryString = ""//
+ + "SELECT ?e ?c ?l ?o " //
+ + "{" //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+ + " OPTIONAL {?e <uri:talksTo> ?o } . "//
+ + "}";//
+
+ CountingResultHandler crh = new CountingResultHandler();
+ PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
+ PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
+ TupleQuery tupQuery = pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupQuery.evaluate(crh);
+
+ Assert.assertEquals(3, crh.getCount());
+
+ }
+
+
+
+ @Test
+ public void testEvaluateSingeIndex()
+ throws TupleQueryResultHandlerException, QueryEvaluationException,
+ MalformedQueryException, RepositoryException, AccumuloException,
+ AccumuloSecurityException, TableExistsException, RyaDAOException,
+ SailException, TableNotFoundException, PcjException, InferenceEngineException {
+
+ String indexSparqlString = ""//
+ + "SELECT ?e ?l ?o" //
+ + "{" //
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
+ + " OPTIONAL{?e <uri:talksTo> ?o } . "//
+ + "}";//
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
+ + "INDEX_1", indexSparqlString, new String[] { "e", "l", "o" },
+ Optional.<PcjVarOrderFactory> absent());
+ String queryString = ""//
+ + "SELECT ?e ?c ?l ?o " //
+ + "{" //
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+ + " OPTIONAL {?e <uri:talksTo> ?o } . "//
+ + " ?e a ?c . "//
+ + "}";//
+
+ CountingResultHandler crh = new CountingResultHandler();
+ PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
+ PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
+
+ repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
+ conn = repo.getConnection();
+ conn.add(sub, RDF.TYPE, subclass);
+ conn.add(sub2, RDF.TYPE, subclass2);
+ conn.add(sub3, RDF.TYPE, subclass3);
+
+
+ TupleQuery tupQuery = pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupQuery.evaluate(crh);
+
+ Assert.assertEquals(3, crh.getCount());
+
+ }
+
+
+
+
+
+
+ @Test
+ public void testSimpleOptionalTest1() throws Exception {
+
+ String query = ""//
+ + "SELECT ?u ?s ?t " //
+ + "{" //
+ + " ?s a ?t ."//
+ + " OPTIONAL{?t <http://www.w3.org/2000/01/rdf-schema#label> ?u } ."//
+ + " ?u <uri:talksTo> ?s . "//
+ + "}";//
+
+ final SPARQLParser parser = new SPARQLParser();
+
+ final ParsedQuery pq1 = parser.parseQuery(query, null);
+
+ final SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet(
+ (Projection) pq1.getTupleExpr().clone());
+
+ final List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>();
+
+ list.add(extTup1);
+
+ final List<QueryModelNode> optTupNodes = Lists.newArrayList();
+ optTupNodes.add(extTup1);
+
+ final PrecompJoinOptimizer pcj = new PrecompJoinOptimizer(list, true);
+ final TupleExpr te = pq1.getTupleExpr();
+ pcj.optimize(te, null, null);
+
+ final NodeCollector nc = new NodeCollector();
+ te.visit(nc);
+
+ final List<QueryModelNode> qNodes = nc.getNodes();
+
+ Assert.assertEquals(qNodes.size(), optTupNodes.size());
+ for (final QueryModelNode node : qNodes) {
+ Assert.assertTrue(optTupNodes.contains(node));
+ }
+
+ }
+
+
+ @Test
+ public void testSimpleOptionalTest2() throws Exception {
+
+ String query = ""//
+ + "SELECT ?u ?s ?t " //
+ + "{" //
+ + " ?s a ?t ."//
+ + " OPTIONAL{?t <http://www.w3.org/2000/01/rdf-schema#label> ?u } ."//
+ + " ?u <uri:talksTo> ?s . "//
+ + " ?s a ?u ."//
+ + "}";//
+
+
+ String pcj = ""//
+ + "SELECT ?d ?b ?c " //
+ + "{" //
+ + " ?b a ?c ."//
+ + " OPTIONAL{?c <http://www.w3.org/2000/01/rdf-schema#label> ?d } ."//
+ + " ?d <uri:talksTo> ?b . "//
+ + "}";//
+
+
+ String relabel_pcj = ""//
+ + "SELECT ?u ?s ?t " //
+ + "{" //
+ + " ?s a ?t ."//
+ + " OPTIONAL{?t <http://www.w3.org/2000/01/rdf-schema#label> ?u } ."//
+ + " ?u <uri:talksTo> ?s . "//
+ + "}";//
+
+
+ final SPARQLParser parser = new SPARQLParser();
+
+ final ParsedQuery pq1 = parser.parseQuery(query, null);
+ final ParsedQuery pq2 = parser.parseQuery(pcj, null);
+ final ParsedQuery pq3 = parser.parseQuery(relabel_pcj, null);
+
+ final SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet(
+ (Projection) pq2.getTupleExpr());
+ final SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet(
+ (Projection) pq3.getTupleExpr());
+
+ final List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>();
+
+ list.add(extTup1);
+
+ final List<QueryModelNode> optTupNodes = Lists.newArrayList();
+ optTupNodes.add(extTup2);
+
+ final PrecompJoinOptimizer opt = new PrecompJoinOptimizer(list, true);
+ final TupleExpr te = pq1.getTupleExpr();
+ opt.optimize(te, null, null);
+
+ final NodeCollector nc = new NodeCollector();
+ te.visit(nc);
+
+ final List<QueryModelNode> qNodes = nc.getNodes();
+
+ Assert.assertEquals(qNodes.size(), optTupNodes.size() + 1);
+ for (QueryModelNode node : optTupNodes) {
+ Assert.assertTrue(qNodes.contains(node));
+ }
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java
index 42e8b09..1be88e2 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java
@@ -31,6 +31,7 @@ import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.sail.config.RyaSailFactory;
import mvm.rya.indexing.accumulo.ConfigUtils;
import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
import mvm.rya.indexing.external.tupleSet.PcjTables;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
@@ -70,6 +71,8 @@ import com.google.common.collect.Sets;
public class PcjIntegrationTestingUtil {
+ private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
public static Set<QueryModelNode> getTupleSets(TupleExpr te) {
final ExternalTupleVisitor etv = new ExternalTupleVisitor();
te.visit(etv);
@@ -386,20 +389,18 @@ public class PcjIntegrationTestingUtil {
for (final VariableOrder varOrder : varOrders) {
try {
// Serialize the result to the variable order.
- byte[] serializedResult = AccumuloPcjSerializer.serialize(
- result, varOrder.toArray());
+ byte[] serializedResult = converter.convert(result, varOrder);
// Row ID = binding set values, Column Family = variable order
// of the binding set.
Mutation addResult = new Mutation(serializedResult);
addResult.put(varOrder.toString(), "", "");
mutations.add(addResult);
- } catch (RyaTypeResolverException e) {
+ } catch (BindingSetConversionException e) {
throw new PcjException("Could not serialize a result.", e);
}
}
return mutations;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerTest.java
index 6e8c721..9428ae4 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerTest.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerTest.java
@@ -174,6 +174,8 @@ public class PrecompJoinOptimizerTest {
}
+
+
@Test
public void testSingleIndex2() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerialzerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerialzerTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerialzerTest.java
index 2fcacb0..4efbb30 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerialzerTest.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerialzerTest.java
@@ -20,99 +20,141 @@ package mvm.rya.indexing.external.tupleSet;
*/
import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.openrdf.model.impl.LiteralImpl;
import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
-import com.vividsolutions.jts.util.Assert;
-
+/**
+ * Tests the methods of {@link AccumuloPcjSerialzer}.
+ */
public class AccumuloPcjSerialzerTest {
+ /**
+ * The BindingSet has fewer Bindings than there are variables in the variable
+ * order, but they are all in the variable order. This is the case where
+ * the missing bindings were optional.
+ */
+ @Test
+ public void serialize_bindingsSubsetOfVarOrder() throws BindingSetConversionException {
+ // Setup the Binding Set.
+ final MapBindingSet originalBindingSet = new MapBindingSet();
+ originalBindingSet.addBinding("x", new URIImpl("http://a"));
+ originalBindingSet.addBinding("y", new URIImpl("http://b"));
+
+ // Setup the variable order.
+ final VariableOrder varOrder = new VariableOrder("x", "a", "y", "b");
+
+ // Create the byte[] representation of the BindingSet.
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ byte[] serialized = converter.convert(originalBindingSet, varOrder);
+
+ // Deserialize the byte[] back into the binding set.
+ BindingSet deserialized = converter.convert(serialized, varOrder);
+
+ // Ensure the deserialized value matches the original.
+ assertEquals(originalBindingSet, deserialized);
+ }
+
+ /**
+ * The BindingSet has a Binding whose name is not in the variable order.
+ * This is illegal.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void serialize_bindingNotInVariableOrder() throws RyaTypeResolverException, BindingSetConversionException {
+ // Setup the Binding Set.
+ final MapBindingSet originalBindingSet = new MapBindingSet();
+ originalBindingSet.addBinding("x", new URIImpl("http://a"));
+ originalBindingSet.addBinding("y", new URIImpl("http://b"));
+ originalBindingSet.addBinding("z", new URIImpl("http://d"));
+
+ // Setup the variable order.
+ final VariableOrder varOrder = new VariableOrder("x", "y");
+
+ // Create the byte[] representation of the BindingSet. This will throw an exception.
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ converter.convert(originalBindingSet, varOrder);
+ }
+
@Test
- public void basicShortUriBsTest() {
+ public void basicShortUriBsTest() throws BindingSetConversionException {
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("X",new URIImpl("http://uri1"));
bs.addBinding("Y",new URIImpl("http://uri2"));
- final String[] varOrder = new String[]{"X","Y"};
- try {
- final byte[] byteVal = AccumuloPcjSerializer.serialize(bs, varOrder);
- final BindingSet newBs = AccumuloPcjSerializer.deSerialize(byteVal, varOrder);
- Assert.equals(bs, newBs);
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
- }
+ final VariableOrder varOrder = new VariableOrder("X","Y");
+
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ final byte[] byteVal = converter.convert(bs, varOrder);
+ final BindingSet newBs = converter.convert(byteVal, varOrder);
+ assertEquals(bs, newBs);
}
@Test
- public void basicLongUriBsTest() {
+ public void basicLongUriBsTest() throws BindingSetConversionException {
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("X",new URIImpl("http://uri1"));
bs.addBinding("Y",new URIImpl("http://uri2"));
bs.addBinding("Z",new URIImpl("http://uri3"));
bs.addBinding("A",new URIImpl("http://uri4"));
bs.addBinding("B",new URIImpl("http://uri5"));
- final String[] varOrder = new String[]{"X","Y","Z","A","B"};
- try {
- final byte[] byteVal = AccumuloPcjSerializer.serialize(bs, varOrder);
- final BindingSet newBs = AccumuloPcjSerializer.deSerialize(byteVal, varOrder);
- Assert.equals(bs, newBs);
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
- }
+ final VariableOrder varOrder = new VariableOrder("X","Y","Z","A","B");
+
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ final byte[] byteVal = converter.convert(bs, varOrder);
+ final BindingSet newBs = converter.convert(byteVal, varOrder);
+ assertEquals(bs, newBs);
}
@Test
- public void basicShortStringLiteralBsTest() {
+ public void basicShortStringLiteralBsTest() throws BindingSetConversionException {
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("X",new LiteralImpl("literal1"));
bs.addBinding("Y",new LiteralImpl("literal2"));
- final String[] varOrder = new String[]{"X","Y"};
- try {
- final byte[] byteVal = AccumuloPcjSerializer.serialize(bs, varOrder);
- final BindingSet newBs = AccumuloPcjSerializer.deSerialize(byteVal, varOrder);
- Assert.equals(bs, newBs);
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
- }
+ final VariableOrder varOrder = new VariableOrder("X","Y");
+
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ final byte[] byteVal = converter.convert(bs, varOrder);
+ final BindingSet newBs = converter.convert(byteVal, varOrder);
+ assertEquals(bs, newBs);
}
@Test
- public void basicShortMixLiteralBsTest() {
+ public void basicShortMixLiteralBsTest() throws BindingSetConversionException {
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("X",new LiteralImpl("literal1"));
bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer")));
- final String[] varOrder = new String[]{"X","Y"};
- try {
- final byte[] byteVal = AccumuloPcjSerializer.serialize(bs, varOrder);
- final BindingSet newBs = AccumuloPcjSerializer.deSerialize(byteVal, varOrder);
- Assert.equals(bs, newBs);
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
- }
+ final VariableOrder varOrder = new VariableOrder("X","Y");
+
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ final byte[] byteVal = converter.convert(bs, varOrder);
+ final BindingSet newBs = converter.convert(byteVal, varOrder);
+ assertEquals(bs, newBs);
}
@Test
- public void basicLongMixLiteralBsTest() {
+ public void basicLongMixLiteralBsTest() throws BindingSetConversionException {
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("X",new LiteralImpl("literal1"));
bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer")));
bs.addBinding("Z",new LiteralImpl("5.0", new URIImpl("http://www.w3.org/2001/XMLSchema#double")));
bs.addBinding("W",new LiteralImpl("1000", new URIImpl("http://www.w3.org/2001/XMLSchema#long")));
- final String[] varOrder = new String[]{"W","X","Y","Z"};
- try {
- final byte[] byteVal = AccumuloPcjSerializer.serialize(bs, varOrder);
- final BindingSet newBs = AccumuloPcjSerializer.deSerialize(byteVal, varOrder);
- Assert.equals(bs, newBs);
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
- }
+ final VariableOrder varOrder = new VariableOrder("W","X","Y","Z");
+
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ final byte[] byteVal = converter.convert(bs, varOrder);
+ final BindingSet newBs = converter.convert(byteVal, varOrder);
+ assertEquals(bs, newBs);
}
@Test
- public void basicMixUriLiteralBsTest() {
+ public void basicMixUriLiteralBsTest() throws BindingSetConversionException {
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding("X",new LiteralImpl("literal1"));
bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer")));
@@ -121,13 +163,11 @@ public class AccumuloPcjSerialzerTest {
bs.addBinding("A",new URIImpl("http://uri1"));
bs.addBinding("B",new URIImpl("http://uri2"));
bs.addBinding("C",new URIImpl("http://uri3"));
- final String[] varOrder = new String[]{"A","W","X","Y","Z","B","C"};
- try {
- final byte[] byteVal = AccumuloPcjSerializer.serialize(bs, varOrder);
- final BindingSet newBs = AccumuloPcjSerializer.deSerialize(byteVal, varOrder);
- Assert.equals(bs, newBs);
- } catch (final RyaTypeResolverException e) {
- e.printStackTrace();
- }
+ final VariableOrder varOrder = new VariableOrder("A","W","X","Y","Z","B","C");
+
+ BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+ final byte[] byteVal = converter.convert(bs, varOrder);
+ final BindingSet newBs = converter.convert(byteVal, varOrder);
+ assertEquals(bs, newBs);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverterTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverterTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverterTest.java
index 0a7f399..725e557 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverterTest.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/BindingSetStringConverterTest.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package mvm.rya.indexing.external.tupleSet;
import static org.junit.Assert.assertEquals;
@@ -32,6 +31,7 @@ import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
@@ -40,7 +40,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
public class BindingSetStringConverterTest {
@Test
- public void toString_URIs() {
+ public void toString_URIs() throws BindingSetConversionException {
// Setup the binding set that will be converted.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new URIImpl("http://a"));
@@ -49,9 +49,10 @@ public class BindingSetStringConverterTest {
// Convert it to a String.
final VariableOrder varOrder = new VariableOrder("y", "z", "x");
- final String bindingSetString = BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final String bindingSetString = converter.convert(originalBindingSet, varOrder);
- // Ensure it converted to the expected result.
+ // Ensure it converted to the expected result.l
final String expected =
"http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
"http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
@@ -61,14 +62,15 @@ public class BindingSetStringConverterTest {
}
@Test
- public void toString_Decimal() {
+ public void toString_Decimal() throws BindingSetConversionException {
// Setup the binding set that will be converted.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new DecimalLiteralImpl(new BigDecimal(2.5)));
// Convert it to a String.
final VariableOrder varOrder = new VariableOrder("x");
- final String bindingSetString = BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final String bindingSetString = converter.convert(originalBindingSet, varOrder);
// Ensure it converted to the expected result.
final String expected = "2.5<<~>>http://www.w3.org/2001/XMLSchema#decimal";
@@ -76,14 +78,15 @@ public class BindingSetStringConverterTest {
}
@Test
- public void toString_Boolean() {
+ public void toString_Boolean() throws BindingSetConversionException {
// Setup the binding set that will be converted.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new BooleanLiteralImpl(true));
// Convert it to a String.
final VariableOrder varOrder = new VariableOrder("x");
- final String bindingSetString = BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final String bindingSetString = converter.convert(originalBindingSet, varOrder);
// Ensure it converted to the expected result.
final String expected = "true<<~>>http://www.w3.org/2001/XMLSchema#boolean";
@@ -91,64 +94,96 @@ public class BindingSetStringConverterTest {
}
@Test
- public void toString_Integer() {
+ public void toString_Integer() throws BindingSetConversionException {
// Setup the binding set that will be converted.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new IntegerLiteralImpl(BigInteger.valueOf(5)));
// Convert it to a String.
final VariableOrder varOrder = new VariableOrder("x");
- final String bindingSetString = BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final String bindingSetString = converter.convert(originalBindingSet, varOrder);
// Ensure it converted to the expected result.
final String expected = "5<<~>>http://www.w3.org/2001/XMLSchema#integer";
assertEquals(expected, bindingSetString);
}
- @Test(expected = IllegalArgumentException.class)
- public void toString_varOrderTooShort() {
- // Setup the binding set that will be converted.
+ /**
+ * All of the Bindings in the BindingSet exactly match the variable order.
+ * This is the simplest case and is legal.
+ */
+ @Test
+ public void toString_bindingsMatchVarOrder() throws BindingSetConversionException {
+ // Setup the Binding Set.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new URIImpl("http://a"));
originalBindingSet.addBinding("y", new URIImpl("http://b"));
- // This variable order that is too short.
- final VariableOrder varOrder = new VariableOrder("y");
+ // Setup the variable order.
+ final VariableOrder varOrder = new VariableOrder("x", "y");
- // The conversion should throw an exception.
- BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ // Create the String representation of the BindingSet.
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ String bindingSetString = converter.convert(originalBindingSet, varOrder);
+
+ // Ensure the expected value was created.
+ String expected =
+ "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
+ "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI";
+ assertEquals(expected, bindingSetString);
}
- @Test(expected = IllegalArgumentException.class)
- public void toString_varOrderTooLong() {
- // Setup the binding set that will be converted.
+ /**
+ * The BindingSet has fewer Bindings than there are variables in the variable
+ * order, but they are all in the variable order. This is the case where
+ * the missing bindings were optional.
+ */
+ @Test
+ public void toString_bindingsSubsetOfVarOrder() throws BindingSetConversionException {
+ // Setup the Binding Set.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new URIImpl("http://a"));
originalBindingSet.addBinding("y", new URIImpl("http://b"));
- // This variable order is too long.
- final VariableOrder varOrder = new VariableOrder("x", "y", "z");
+ // Setup the variable order.
+ final VariableOrder varOrder = new VariableOrder("x", "a", "y", "b");
- // The conversion should throw an exception.
- BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ // Create the String representation of the BindingSet.
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ String bindingSetString = converter.convert(originalBindingSet, varOrder);
+
+ // Ensure the expected value was created.
+ String expected =
+ "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
+ BindingSetStringConverter.NULL_VALUE_STRING + ":::" +
+ "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
+ BindingSetStringConverter.NULL_VALUE_STRING;
+ assertEquals(expected, bindingSetString);
}
+ /**
+ * The BindingSet has a Binding whose name is not in the variable order.
+ * This is illegal.
+ */
@Test(expected = IllegalArgumentException.class)
- public void toString_varOrderWrongBindingNames() {
- // Setup the binding set that will be converted.
+ public void toString_bindingNotInVariableOrder() throws BindingSetConversionException {
+ // Setup the Binding Set.
final MapBindingSet originalBindingSet = new MapBindingSet();
originalBindingSet.addBinding("x", new URIImpl("http://a"));
originalBindingSet.addBinding("y", new URIImpl("http://b"));
+ originalBindingSet.addBinding("z", new URIImpl("http://d"));
- // This variable order has the wrong binding names.
- final VariableOrder varOrder = new VariableOrder("x", "a");
+ // Setup the variable order.
+ final VariableOrder varOrder = new VariableOrder("x", "y");
- // The conversion should throw an exception.
- BindingSetStringConverter.toString(originalBindingSet, varOrder);
+ // Create the String representation of the BindingSet. This will throw an exception.
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ converter.convert(originalBindingSet, varOrder);
}
@Test
- public void fromString() {
+ public void fromString() throws BindingSetConversionException {
// Setup the String that will be converted.
final String bindingSetString =
"http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
@@ -157,7 +192,8 @@ public class BindingSetStringConverterTest {
// Convert it to a BindingSet
final VariableOrder varOrder = new VariableOrder("y", "z", "x");
- final BindingSet bindingSet = BindingSetStringConverter.fromString(bindingSetString, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
// Ensure it converted to the expected result.
final MapBindingSet expected = new MapBindingSet();
@@ -168,13 +204,40 @@ public class BindingSetStringConverterTest {
assertEquals(expected, bindingSet);
}
+ /**
+ * Ensures that when a binding set is converted from a String back to a
+ * BindingSet, null values do not get converted into Bindings.
+ */
+ @Test
+ public void fromString_nullValues() throws BindingSetConversionException {
+ // Setup the String that will be converted.
+ final String bindingSetString =
+ "http://value 1<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
+ BindingSetStringConverter.NULL_VALUE_STRING + ":::" +
+ "http://value 2<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
+ BindingSetStringConverter.NULL_VALUE_STRING;
+
+ // Convert it to a BindingSet
+ VariableOrder varOrder = new VariableOrder("x", "a", "y", "b");
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
+
+ // Ensure it converted to the expected reuslt.
+ final MapBindingSet expected = new MapBindingSet();
+ expected.addBinding("x", new URIImpl("http://value 1"));
+ expected.addBinding("y", new URIImpl("http://value 2"));
+
+ assertEquals(expected, bindingSet);
+ }
+
@Test
- public void fromString_Decimal() {
+ public void fromString_Decimal() throws BindingSetConversionException {
// Setup the String that will be converted.
final String bindingSetString = "2.5<<~>>http://www.w3.org/2001/XMLSchema#decimal";
// Convert it to a BindingSet
- final BindingSet bindingSet = BindingSetStringConverter.fromString(bindingSetString, new VariableOrder("x"));
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final BindingSet bindingSet = converter.convert(bindingSetString, new VariableOrder("x"));
// Ensure it converted to the expected result.
final MapBindingSet expected = new MapBindingSet();
@@ -184,12 +247,13 @@ public class BindingSetStringConverterTest {
}
@Test
- public void fromString_Boolean() {
+ public void fromString_Boolean() throws BindingSetConversionException {
// Setup the String that will be converted.
final String bindingSetString = "true<<~>>http://www.w3.org/2001/XMLSchema#boolean";
// Convert it to a BindingSet
- final BindingSet bindingSet = BindingSetStringConverter.fromString(bindingSetString, new VariableOrder("x"));
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final BindingSet bindingSet = converter.convert(bindingSetString, new VariableOrder("x"));
// Ensure it converted to the expected result.
final MapBindingSet expected = new MapBindingSet();
@@ -199,12 +263,13 @@ public class BindingSetStringConverterTest {
}
@Test
- public void fromString_Integer() {
+ public void fromString_Integer() throws BindingSetConversionException {
// Setup the String that will be converted.
final String bindingSetString = "5<<~>>http://www.w3.org/2001/XMLSchema#integer";
// Convert it to a BindingSet
- final BindingSet bindingSet = BindingSetStringConverter.fromString(bindingSetString, new VariableOrder("x"));
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ final BindingSet bindingSet = converter.convert(bindingSetString, new VariableOrder("x"));
// Ensure it converted to the expected result.
final MapBindingSet expected = new MapBindingSet();
@@ -214,7 +279,7 @@ public class BindingSetStringConverterTest {
}
@Test(expected = IllegalArgumentException.class)
- public void fromString_varOrderTooShort() {
+ public void fromString_varOrderTooShort() throws BindingSetConversionException {
// Setup the String that will be converted.
final String bindingSetString =
"http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
@@ -224,11 +289,12 @@ public class BindingSetStringConverterTest {
VariableOrder varOrder = new VariableOrder("x");
// The conversion should throw an exception.
- BindingSetStringConverter.fromString(bindingSetString, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ converter.convert(bindingSetString, varOrder);
}
@Test(expected = IllegalArgumentException.class)
- public void fromString_varOrderTooLong() {
+ public void fromString_varOrderTooLong() throws BindingSetConversionException {
// Setup the String that will be converted.
final String bindingSetString =
"http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
@@ -238,6 +304,7 @@ public class BindingSetStringConverterTest {
VariableOrder varOrder = new VariableOrder("x", "y", "z");
// The conversion should throw an exception.
- BindingSetStringConverter.fromString(bindingSetString, varOrder);
+ BindingSetConverter<String> converter = new BindingSetStringConverter();
+ converter.convert(bindingSetString, varOrder);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/PcjTablesIntegrationTests.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/PcjTablesIntegrationTests.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/PcjTablesIntegrationTests.java
index e3adc16..bb21d33 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/PcjTablesIntegrationTests.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/PcjTablesIntegrationTests.java
@@ -33,6 +33,7 @@ import mvm.rya.accumulo.AccumuloRyaDAO;
import mvm.rya.api.RdfCloudTripleStoreConfiguration;
import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory;
@@ -83,6 +84,8 @@ import com.google.common.io.Files;
public class PcjTablesIntegrationTests {
private static final Logger log = Logger.getLogger(PcjTablesIntegrationTests.class);
+ private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
protected static final String RYA_TABLE_PREFIX = "demo_";
// Rya data store and connections.
@@ -137,7 +140,7 @@ public class PcjTablesIntegrationTests {
* The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)}
*/
@Test
- public void addResults() throws PcjException, TableNotFoundException, RyaTypeResolverException {
+ public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException {
final String sparql =
"SELECT ?name ?age " +
"{" +
@@ -189,7 +192,7 @@ public class PcjTablesIntegrationTests {
* The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)}
*/
@Test
- public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, RyaTypeResolverException {
+ public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException {
// Load some Triples into Rya.
Set<Statement> triples = new HashSet<>();
triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
@@ -257,7 +260,7 @@ public class PcjTablesIntegrationTests {
* The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)}
*/
@Test
- public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, RyaTypeResolverException {
+ public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException {
// Load some Triples into Rya.
Set<Statement> triples = new HashSet<>();
triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
@@ -322,7 +325,7 @@ public class PcjTablesIntegrationTests {
* multimap stores a set of deserialized binding sets that were in the PCJ
* table for every variable order that is found in the PCJ metadata.
*/
- private static Multimap<String, BindingSet> loadPcjResults(Connector accumuloConn, String pcjTableName) throws PcjException, TableNotFoundException, RyaTypeResolverException {
+ private static Multimap<String, BindingSet> loadPcjResults(Connector accumuloConn, String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException {
Multimap<String, BindingSet> fetchedResults = HashMultimap.create();
// Get the variable orders the data was written to.
@@ -336,7 +339,7 @@ public class PcjTablesIntegrationTests {
for(Entry<Key, Value> entry : scanner) {
byte[] serializedResult = entry.getKey().getRow().getBytes();
- BindingSet result = AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray());
+ BindingSet result = converter.convert(serializedResult, varOrder);
fetchedResults.put(varOrder.toString(), result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index b99d293..12e32b6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -36,10 +36,12 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.impl.MapBindingSet;
import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
import org.openrdf.sail.SailConnection;
@@ -48,6 +50,7 @@ import org.openrdf.sail.SailException;
import info.aduna.iteration.CloseableIteration;
import io.fluo.api.client.FluoClient;
import io.fluo.api.types.TypedTransaction;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
import mvm.rya.indexing.external.tupleSet.PcjTables;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory;
@@ -229,13 +232,21 @@ public class CreatePcj {
checkNotNull(spMetadata);
checkNotNull(batch);
+ BindingSetStringConverter converter = new BindingSetStringConverter();
+
try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) {
// Get the node's variable order.
final String spNodeId = spMetadata.getNodeId();
- final String[] varOrder = spMetadata.getVariableOrder().toArray();
+ final VariableOrder varOrder = spMetadata.getVariableOrder();
for(final BindingSet bindingSet : batch) {
- final String bindingSetStr = FluoStringConverter.toBindingSetString(bindingSet, varOrder);
+ MapBindingSet spBindingSet = new MapBindingSet();
+ for(String var : varOrder) {
+ Binding binding = bindingSet.getBinding(var);
+ spBindingSet.addBinding(binding);
+ }
+
+ String bindingSetStr = converter.convert(spBindingSet, varOrder);
// Write the binding set entry to Fluo for the statement pattern.
tx.mutate().row(spNodeId + NODEID_BS_DELIM + bindingSetStr)
[4/4] incubator-rya git commit: Merge branch
'RYA-50-Add-Support-for-Sparql-Optional-Statements-in-Precomputed-Joins' of
https://github.com/meiercaleb/incubator-rya into RYA-50
Posted by pu...@apache.org.
Merge branch 'RYA-50-Add-Support-for-Sparql-Optional-Statements-in-Precomputed-Joins' of https://github.com/meiercaleb/incubator-rya into RYA-50
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/30ca57ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/30ca57ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/30ca57ed
Branch: refs/heads/develop
Commit: 30ca57ede51ad346636593fa48a355b3a9f06e9b
Parents: 1d92d19 a24515b
Author: pujav65 <pu...@gmail.com>
Authored: Wed Apr 6 16:53:18 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Wed Apr 6 16:53:18 2016 -0400
----------------------------------------------------------------------
.../accumulo/precompQuery/AccumuloPcjQuery.java | 40 +-
.../indexing/external/PrecompJoinOptimizer.java | 20 +
.../external/tupleSet/AccumuloIndexSet.java | 4 +-
.../tupleSet/AccumuloPcjSerializer.java | 199 ++++++----
.../external/tupleSet/BindingSetConverter.java | 108 +++++
.../tupleSet/BindingSetStringConverter.java | 111 +++---
.../indexing/external/tupleSet/PcjTables.java | 26 +-
.../indexing/external/PCJOptionalTestIT.java | 324 +++++++++++++++
.../external/PcjIntegrationTestingUtil.java | 9 +-
.../external/PrecompJoinOptimizerTest.java | 2 +
.../tupleSet/AccumuloPcjSerialzerTest.java | 154 +++++---
.../tupleSet/BindingSetStringConverterTest.java | 151 +++++--
.../tupleSet/PcjTablesIntegrationTests.java | 13 +-
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 15 +-
.../indexing/pcj/fluo/app/BindingSetRow.java | 23 +-
.../pcj/fluo/app/FilterResultUpdater.java | 10 +-
.../pcj/fluo/app/FluoStringConverter.java | 152 -------
.../pcj/fluo/app/JoinResultUpdater.java | 395 +++++++++++++------
.../pcj/fluo/app/QueryResultUpdater.java | 10 +-
.../fluo/app/observers/BindingSetUpdater.java | 7 +-
.../pcj/fluo/app/observers/FilterObserver.java | 10 +-
.../pcj/fluo/app/observers/JoinObserver.java | 10 +-
.../fluo/app/observers/QueryResultObserver.java | 9 +-
.../app/observers/StatementPatternObserver.java | 10 +-
.../pcj/fluo/app/query/FluoQueryColumns.java | 2 +
.../fluo/app/query/FluoQueryMetadataDAO.java | 7 +
.../pcj/fluo/app/query/JoinMetadata.java | 35 ++
.../fluo/app/query/SparqlFluoQueryBuilder.java | 64 +--
.../pcj/fluo/app/FluoStringConverterTest.java | 74 ----
.../pcj/fluo/app/LeftOuterJoinTest.java | 174 ++++++++
.../indexing/pcj/fluo/app/NaturalJoinTest.java | 166 ++++++++
.../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 7 +-
.../apache/rya/indexing/pcj/fluo/ITBase.java | 9 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 2 +
.../indexing/pcj/fluo/integration/QueryIT.java | 44 +++
.../pcj/fluo/integration/RyaExportIT.java | 7 +-
36 files changed, 1730 insertions(+), 673 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-rya git commit: RYA-50 Updated Rya PCJs to support
SPARQL queries with OPTIONALs.
Posted by pu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
index 859cd4b..85f1b1f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
@@ -19,7 +19,6 @@
package org.apache.rya.indexing.pcj.fluo.app;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
import javax.annotation.ParametersAreNonnullByDefault;
@@ -35,17 +34,17 @@ import io.fluo.api.data.Bytes;
@ParametersAreNonnullByDefault
public class BindingSetRow {
private final String nodeId;
- private final String[] bindingStrings;
+ private final String bindingSetString;
/**
* Constructs an instance of {@link BindingSetRow}.
*
* @param nodeId - The Node ID of a query node. (not null)
- * @param bindingStrings - A Binding Set that is part of the node's results. (not null)
+ * @param bindingSetString - A Binding Set that is part of the node's results. (not null)
*/
- public BindingSetRow(final String nodeId, final String[] bindingStrings) {
+ public BindingSetRow(final String nodeId, final String bindingSetString) {
this.nodeId = checkNotNull(nodeId);
- this.bindingStrings = checkNotNull(bindingStrings);
+ this.bindingSetString = checkNotNull(bindingSetString);
}
/**
@@ -56,11 +55,10 @@ public class BindingSetRow {
}
/**
- * @return A Binding Set that is part of the node's results. It is formatted
- * in SPO order and each String requires further interpretation.
+ * @return A Binding Set that is part of the node's results.
*/
- public String[] getBindingStrings() {
- return bindingStrings;
+ public String getBindingSetString() {
+ return bindingSetString;
}
/**
@@ -77,11 +75,10 @@ public class BindingSetRow {
if(rowArray.length != 2) {
throw new IllegalArgumentException("A row must contain a single NODEID_BS_DELIM.");
}
- final String nodeId = rowArray[0];
- // Read the row's Binding Set from the bytes.
- final String[] bindingStrings = rowArray[1].split(DELIM);
+ final String nodeId = rowArray[0];
+ String bindingSetString = rowArray[1];
- return new BindingSetRow(nodeId, bindingStrings);
+ return new BindingSetRow(nodeId, bindingSetString);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 5ff5acc..3af079f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -62,6 +62,8 @@ public class FilterResultUpdater {
private final Encoder encoder = new StringEncoder();
+ private final BindingSetStringConverter converter = new BindingSetStringConverter();
+
/**
* A utility class used to search SPARQL queries for Filters.
*/
@@ -119,10 +121,12 @@ public class FilterResultUpdater {
final MapBindingSet filterBindingSet = new MapBindingSet();
for(final String bindingName : filterVarOrder) {
- final Binding binding = childBindingSet.getBinding(bindingName);
- filterBindingSet.addBinding(binding);
+ if(childBindingSet.hasBinding(bindingName)) {
+ final Binding binding = childBindingSet.getBinding(bindingName);
+ filterBindingSet.addBinding(binding);
+ }
}
- final String filterBindingSetString = BindingSetStringConverter.toString(filterBindingSet, filterVarOrder);
+ final String filterBindingSetString = converter.convert(filterBindingSet, filterVarOrder);
final Bytes row = encoder.encode( filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetString );
final Column col = FluoQueryColumns.FILTER_BINDING_SET;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
index 61e3d5f..aa7c959 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -18,32 +18,20 @@
*/
package org.apache.rya.indexing.pcj.fluo.app;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
-
-import java.util.Collection;
import javax.annotation.ParametersAreNonnullByDefault;
import org.openrdf.model.Literal;
import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.LiteralImpl;
import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.Var;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.base.Joiner;
import mvm.rya.api.domain.RyaType;
import mvm.rya.api.resolver.RdfToRyaConversions;
@@ -55,117 +43,6 @@ import mvm.rya.api.resolver.RdfToRyaConversions;
@ParametersAreNonnullByDefault
public class FluoStringConverter {
- private static final ValueFactory valueFactory = new ValueFactoryImpl();
-
- /**
- * Converts an ordered collection of variables into the Variable Order
- * String that is stored in the {@link IncrementalUpdateConstants#NODE_VARS}
- * column of the Fluo application.
- *
- * @param varOrder - An ordered collection of variables. (not null)
- * @return The string representation of the variable order.
- */
- public static String toVarOrderString(final Collection<String> varOrder) {
- checkNotNull(varOrder);
- return Joiner.on(VAR_DELIM).join(varOrder);
- }
-
- /**
- * Converts an ordered array of variables into the Variable Order
- * String that is stored in the {@link IncrementalUpdateConstants#NODE_VARS}
- * column of the Fluo application.
- *
- * @param varOrder - An ordered array of variables. (not null)
- * @return The string representation of the variable order.
- */
- public static String toVarOrderString(final String... varOrder) {
- return Joiner.on(VAR_DELIM).join(varOrder);
- }
-
- /**
- * Converts a String into an array holding the Variable Order of a Binding Set.
- *
- * @param varOrderString - The string representation of the variable order. (not null)
- * @return An ordered array holding the variable order of a binding set.
- */
- public static String[] toVarOrder(final String varOrderString) {
- checkNotNull(varOrderString);
- return varOrderString.split(VAR_DELIM);
- }
-
- /**
- * Converts a {@link BindingSet} to the String representation that the Fluo
- * application serializes to the Binding Set columns.
- *
- * @param bindingSet - The binding set values. (not null)
- * @param varOrder - The order the variables must appear in. (not null)
- * @return A {@code String} version of {@code bindingSet} suitable for
- * serialization to one of the Fluo application's binding set columns.
- */
- public static String toBindingSetString(final BindingSet bindingSet, final String[] varOrder) {
- checkNotNull(bindingSet);
- checkNotNull(varOrder);
-
- final StringBuilder bindingSetString = new StringBuilder();
-
- for(int i = 0; i < varOrder.length; i++) {
- // Add a value to the binding set.
- final String varName = varOrder[i];
- final Value value = bindingSet.getBinding(varName).getValue();
- final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
- bindingSetString.append( ryaValue.getData() ).append(TYPE_DELIM).append( ryaValue.getDataType() );
-
- // If there are more values to add, include a delimiter between them.
- if(i != varOrder.length-1) {
- bindingSetString.append(DELIM);
- }
- }
-
- return bindingSetString.toString();
- }
-
- /**
- * Converts the String representation of a {@link BindingSet} as is created
- * by {@link #toBindingSetString(BindingSet, String[])} back into a
- * BindingSet.
- *
- * @param bindingSetString - The binding set values as a String. (not null)
- * @param varOrder - The order the variables appear in the String version of
- * the BindingSet. (not null)
- * @return A {@link BindingSet} representation of the String.
- */
- public static BindingSet toBindingSet(final String bindingSetString, final String[] varOrder) {
- checkNotNull(bindingSetString);
- checkNotNull(varOrder);
-
- final String[] bindingStrings = toBindingStrings(bindingSetString);
- return toBindingSet(bindingStrings, varOrder);
- }
-
- /**
- * Creates a {@link BindingSet} from an ordered array of Strings that represent
- * {@link Binding}s and their variable names.
- *
- * @param bindingStrings - An ordered array of Strings representing {@link Binding}s. (not null)
- * @param varOrder - An ordered array of variable names for the binding strings. (not null)
- * @return The parameters converted into a {@link BindingSet}.
- */
- public static BindingSet toBindingSet(final String[] bindingStrings, final String[] varOrder) {
- checkNotNull(varOrder);
- checkNotNull(bindingStrings);
- checkArgument(varOrder.length == bindingStrings.length);
-
- final QueryBindingSet bindingSet = new QueryBindingSet();
-
- for(int i = 0; i < bindingStrings.length; i++) {
- final String name = varOrder[i];
- final Value value = FluoStringConverter.toValue(bindingStrings[i]);
- bindingSet.addBinding(name, value);
- }
-
- return bindingSet;
- }
-
/**
* Extract the {@link Binding} strings from a {@link BindingSet}'s string form.
*
@@ -178,35 +55,6 @@ public class FluoStringConverter {
}
/**
- * Creates a {@link Value} from a String representation of it.
- *
- * @param valueString - The String representation of the value. (not null)
- * @return The {@link Value} representation of the String.
- */
- public static Value toValue(final String valueString) {
- checkNotNull(valueString);
-
- // Split the String that was stored in Fluo into its Value and Type parts.
- final String[] valueAndType = valueString.split(TYPE_DELIM);
- if(valueAndType.length != 2) {
- throw new IllegalArgumentException("Array must contain data and type info!");
- }
-
- final String dataString = valueAndType[0];
- final String typeString = valueAndType[1];
-
- // Convert the String Type into a URI that describes the type.
- final URI typeURI = valueFactory.createURI(typeString);
-
- // Convert the String Value into a Value.
- final Value value = typeURI.equals(XMLSchema.ANYURI) ?
- valueFactory.createURI(dataString) :
- valueFactory.createLiteral(dataString, new URIImpl(typeString));
-
- return value;
- }
-
- /**
* Converts the String representation of a {@link StatementPattern} back
* into the object version.
*
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index a25cb92..bc558a5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -25,18 +25,21 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NO
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.config.ScannerConfiguration;
@@ -47,6 +50,9 @@ import io.fluo.api.iterator.ColumnIterator;
import io.fluo.api.iterator.RowIterator;
import io.fluo.api.types.Encoder;
import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
@@ -55,7 +61,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
@ParametersAreNonnullByDefault
public class JoinResultUpdater {
- private static final Logger log = Logger.getLogger(JoinResultUpdater.class);
+
+ private static final BindingSetConverter<String> converter = new BindingSetStringConverter();
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
private final Encoder encoder = new StringEncoder();
@@ -67,93 +74,112 @@ public class JoinResultUpdater {
* @param tx - The transaction all Fluo queries will use. (not null)
* @param childId - The Node ID of the child whose results received a new Binding Set. (not null)
* @param childBindingSet - The Binding Set that was just emitted by child node. (not null)
- * @param joinMetadata - The metadatat for the Join that has been notified. (not null)
+ * @param joinMetadata - The metadata for the Join that has been notified. (not null)
+ * @throws BindingSetConversionException
*/
public void updateJoinResults(
final TransactionBase tx,
final String childId,
final BindingSet childBindingSet,
- final JoinMetadata joinMetadata) {
+ final JoinMetadata joinMetadata) throws BindingSetConversionException {
checkNotNull(tx);
checkNotNull(childId);
+ checkNotNull(childBindingSet);
checkNotNull(joinMetadata);
- // Read the Join metadata from the Fluo table.
- final String[] joinVarOrder = joinMetadata.getVariableOrder().toArray();
+ // Figure out which join algorithm we are going to use.
+ final IterativeJoin joinAlgorithm;
+ switch(joinMetadata.getJoinType()) {
+ case NATURAL_JOIN:
+ joinAlgorithm = new NaturalJoin();
+ break;
+ case LEFT_OUTER_JOIN:
+ joinAlgorithm = new LeftOuterJoin();
+ break;
+ default:
+ throw new RuntimeException("Unsupported JoinType: " + joinMetadata.getJoinType());
+ }
+
+ // Figure out which side of the join the new binding set appeared on.
+ final Side emittingSide;
+ final String siblingId;
- // Transform the Child binding set and varaible order values to be easier to work with.
- final VariableOrder childVarOrder = getVarOrder(tx, childId);
- final String[] childVarOrderArray = childVarOrder.toArray();
- final String childBindingSetString = FluoStringConverter.toBindingSetString(childBindingSet, childVarOrder.toArray());
- final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString);
+ if(childId.equals(joinMetadata.getLeftChildNodeId())) {
+ emittingSide = Side.LEFT;
+ siblingId = joinMetadata.getRightChildNodeId();
+ } else {
+ emittingSide = Side.RIGHT;
+ siblingId = joinMetadata.getLeftChildNodeId();
+ }
- // Transform the Sibling binding set and varaible order values to be easier to work with.
- final String leftChildId = joinMetadata.getLeftChildNodeId();
- final String rightChildId = joinMetadata.getRightChildNodeId();
- final String siblingId = leftChildId.equals(childId) ? rightChildId : leftChildId;
+ // Iterates over the sibling node's BindingSets that join with the new binding set.
+ FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
- final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
- final String[] siblingVarOrderArray = siblingVarOrder.toArray();
-
- // Create a map that will be used later in this algorithm to create new Join result
- // Binding Sets. It is initialized with all of the values that are in childBindingSet.
- // The common values and any that are added on by the sibling will be overwritten
- // for each sibling scan result.
- final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
- final Map<String, String> joinBindingSet = Maps.newHashMap();
- for(int i = 0; i < childVarOrderArray.length; i++) {
- joinBindingSet.put(childVarOrderArray[i], childBindingStrings[i]);
+ // Iterates over the resulting BindingSets from the join.
+ final Iterator<BindingSet> newJoinResults;
+ if(emittingSide == Side.LEFT) {
+ newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets);
+ } else {
+ newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets, childBindingSet);
}
+ // Insert the new join binding sets to the Fluo table.
+ VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+ while(newJoinResults.hasNext()) {
+ BindingSet newJoinResult = newJoinResults.next();
+ String joinBindingSetString = converter.convert(newJoinResult, joinVarOrder);
+
+ final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString);
+ final Column col = FluoQueryColumns.JOIN_BINDING_SET;
+ final Bytes value = encoder.encode(joinBindingSetString);
+ tx.set(row, col, value);
+ }
+ }
+
+ /**
+ * The different sides a new binding set may appear on.
+ */
+ public static enum Side {
+ LEFT, RIGHT;
+ }
+
+ private FluoTableIterator makeSiblingScanIterator(String childId, BindingSet childBindingSet, String siblingId, TransactionBase tx) throws BindingSetConversionException {
+ // Get the common variable orders. These are used to build the prefix.
+ final VariableOrder childVarOrder = getVarOrder(tx, childId);
+ final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
+ List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
+
+ // Get the Binding strings
+ final String childBindingSetString = converter.convert(childBindingSet, childVarOrder);
+ final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString);
+
// Create the prefix that will be used to scan for binding sets of the sibling node.
// This prefix includes the sibling Node ID and the common variable values from
// childBindingSet.
- String bsPrefix = "";
+ String siblingScanPrefix = "";
for(int i = 0; i < commonVars.size(); i++) {
- if(bsPrefix.length() == 0) {
- bsPrefix = childBindingStrings[i];
+ if(siblingScanPrefix.length() == 0) {
+ siblingScanPrefix = childBindingStrings[i];
} else {
- bsPrefix += DELIM + childBindingStrings[i];
+ siblingScanPrefix += DELIM + childBindingStrings[i];
}
}
- bsPrefix = siblingId + NODEID_BS_DELIM + bsPrefix;
+ siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix;
// Scan the sibling node's binding sets for those that have the same
// common variable values as childBindingSet. These needs to be joined
// and inserted into the Join's results. It's possible that none of these
// results will be new Join results if they have already been created in
// earlier iterations of this algorithm.
- final ScannerConfiguration sc1 = new ScannerConfiguration();
- sc1.setSpan(Span.prefix(bsPrefix));
- setScanColumnFamily(sc1, siblingId);
-
- try {
- final RowIterator ri = tx.get(sc1);
- while(ri.hasNext()) {
- final ColumnIterator ci = ri.next().getValue();
- while(ci.hasNext()){
- // Get a sibling binding set.
- final String siblingBindingSetString = ci.next().getValue().toString();
- final String[] siblingBindingStrings = FluoStringConverter.toBindingStrings(siblingBindingSetString);
-
- // Overwrite the previous sibling's values to create a new join binding set.
- for (int i = 0; i < siblingBindingStrings.length; i++) {
- joinBindingSet.put(siblingVarOrderArray[i], siblingBindingStrings[i]);
- }
- final String joinBindingSetString = makeBindingSetString(joinVarOrder, joinBindingSet);
+ final ScannerConfiguration scanConfig = new ScannerConfiguration();
+ scanConfig.setSpan(Span.prefix(siblingScanPrefix));
+ setScanColumnFamily(scanConfig, siblingId);
- // Write the join binding set to Fluo.
- final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString);
- final Column col = FluoQueryColumns.JOIN_BINDING_SET;
- final Bytes value = encoder.encode(joinBindingSetString);
- tx.set(row, col, value);
- }
- }
- } catch (final Exception e) {
- log.error("Error while scanning sibling binding sets to create new join results.", e);
- }
+ final RowIterator ri = tx.get(scanConfig);
+ return new FluoTableIterator(ri, siblingVarOrder);
}
+
/**
* Fetch the {@link VariableOrder} of a query node.
*
@@ -216,40 +242,6 @@ public class JoinResultUpdater {
return commonVars;
}
-// /**
-// * Assuming that the common variables between two children are already
-// * shifted to the left, find the common variables between them.
-// * <p>
-// * Refer to {@link FluoQueryInitializer} to see why this assumption is being made.
-// *
-// * @param vars1 - The first child's variable order. (not null)
-// * @param vars2 - The second child's variable order. (not null)
-// * @return An ordered List of the common variables between the two children.
-// */
-// private List<String> getCommonVars(final String[] vars1, final String[] vars2) {
-// checkNotNull(vars1);
-// checkNotNull(vars2);
-//
-// final List<String> commonVars = new ArrayList<>();
-//
-// // Only need to iteratre through the shorted order's length.
-// final int shortestLen = Math.min(vars1.length, vars2.length);
-// for(int i = 0; i < shortestLen; i++) {
-// final String var1 = vars1[i];
-// final String var2 = vars2[i];
-//
-// if(var1.equals(var2)) {
-// commonVars.add(var1);
-// } else {
-// // Because the common variables are left shifted, we can break once
-// // we encounter a pair that does not match.
-// break;
-// }
-// }
-//
-// return commonVars;
-// }
-
/**
* Update a {@link ScannerConfiguration} to use the sibling node's binding
* set column for its scan. The column that will be used is determined by the
@@ -281,34 +273,211 @@ public class JoinResultUpdater {
column = FluoQueryColumns.JOIN_BINDING_SET;
break;
default:
- throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, or Filter.");
+ throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, Left Join, or Filter.");
}
sc.fetchColumn(column.getFamily(), column.getQualifier());
}
/**
- * Create a Binding Set String from a variable order and a map of bindings.
- *
- * @param varOrder - The resulting binding set's variable order. (not null)
- * @param bindingSetValues - A map holding the variables and their values that will be
- * included in the resulting binding set.
- * @return A binding set string build from the map using the prescribed variable order.
+ * Defines each of the cases that may generate new join results when
+ * iteratively computing a query's join node.
*/
- private static String makeBindingSetString(final String[] varOrder, final Map<String, String> bindingSetValues) {
- checkNotNull(varOrder);
- checkNotNull(bindingSetValues);
+ public static interface IterativeJoin {
+
+ /**
+ * Invoked when a new {@link BindingSet} is emitted from the left child
+ * node of the join. The Fluo table is scanned for results on the right
+ * side that will be joined with the new result.
+ *
+ * @param newLeftResult - A new BindingSet that has been emitted from
+ * the left child node.
+ * @param rightResults - The right child node's binding sets that will
+ * be joined with the new left result. (not null)
+ * @return The new BindingSet results for the join.
+ */
+ public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults);
+
+ /**
+ * Invoked when a new {@link BindingSet} is emitted from the right child
+ * node of the join. The Fluo table is scanned for results on the left
+ * side that will be joined with the new result.
+ *
+ * @param leftResults - The left child node's binding sets that will be
+ * joined with the new right result.
+ * @param newRightResult - A new BindingSet that has been emitted from
+ * the right child node.
+ * @return The new BindingSet results for the join.
+ */
+ public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult);
+ }
- String bindingSetString = "";
+ /**
+ * Implements an {@link IterativeJoin} that uses the Natural Join algorithm
+ * defined by Relational Algebra.
+ * <p>
+ * This is how you combine {@code BindnigSet}s that may have common Binding
+ * names. When two Binding Sets are joined, any bindings that appear in both
+ * binding sets are only included once.
+ */
+ public static final class NaturalJoin implements IterativeJoin {
+ @Override
+ public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) {
+ checkNotNull(newLeftResult);
+ checkNotNull(rightResults);
+
+ // Both sides are required, so if there are no right results, then do not emit anything.
+ return new LazyJoiningIterator(newLeftResult, rightResults);
+ }
- for (final String joinVar : varOrder) {
- if (bindingSetString.length() == 0) {
- bindingSetString = bindingSetValues.get(joinVar);
- } else {
- bindingSetString = bindingSetString + DELIM + bindingSetValues.get(joinVar);
+ @Override
+ public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult) {
+ checkNotNull(leftResults);
+ checkNotNull(newRightResult);
+
+ // Both sides are required, so if there are no left reuslts, then do not emit anything.
+ return new LazyJoiningIterator(newRightResult, leftResults);
+ }
+ }
+
+ /**
+ * Implements an {@link IterativeJoin} that uses the Left Outer Join
+ * algorithm defined by Relational Algebra.
+ * <p>
+ * This is how you add optional information to a {@link BindingSet}. Left
+ * binding sets are emitted even if they do not join with anything on the right.
+ * However, right binding sets must be joined with a left binding set.
+ */
+ public static final class LeftOuterJoin implements IterativeJoin {
+ @Override
+ public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) {
+ checkNotNull(newLeftResult);
+ checkNotNull(rightResults);
+
+ // If the required portion does not join with any optional portions,
+ // then emit a BindingSet that matches the new left result.
+ if(!rightResults.hasNext()) {
+ return Lists.<BindingSet>newArrayList(newLeftResult).iterator();
+ }
+
+ // Otherwise, return an iterator that holds the new required result
+ // joined with the right results.
+ return new LazyJoiningIterator(newLeftResult, rightResults);
+ }
+
+ @Override
+ public Iterator<BindingSet> newRightResult(final Iterator<BindingSet> leftResults, final BindingSet newRightResult) {
+ checkNotNull(leftResults);
+ checkNotNull(newRightResult);
+
+ // The right result is optional, so if it does not join with anything
+ // on the left, then do not emit anything.
+ return new LazyJoiningIterator(newRightResult, leftResults);
+ }
+ }
+
+ /**
+ * Joins a {@link BindingSet} (which is new to the left or right side of a join)
+ * to all binding sets on the other side that join with it.
+ * <p>
+ * This is done lazily so that you don't have to load all of the BindingSets
+ * into memory at once.
+ */
+ private static final class LazyJoiningIterator implements Iterator<BindingSet> {
+
+ private final BindingSet newResult;
+ private final Iterator<BindingSet> joinedResults;
+
+ /**
+ * Constructs an instance of {@link LazyJoiningIterator}.
+ *
+ * @param newResult - A binding set that will be joined with some other binding sets. (not null)
+ * @param joinResults - The binding sets that will be joined with {@code newResult}. (not null)
+ */
+ public LazyJoiningIterator(BindingSet newResult, Iterator<BindingSet> joinResults) {
+ this.newResult = checkNotNull(newResult);
+ this.joinedResults = checkNotNull(joinResults);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return joinedResults.hasNext();
+ }
+
+ @Override
+ public BindingSet next() {
+ final MapBindingSet bs = new MapBindingSet();
+
+ for(Binding binding : newResult) {
+ bs.addBinding(binding);
+ }
+
+ for(Binding binding : joinedResults.next()) {
+ bs.addBinding(binding);
}
+
+ return bs;
}
- return bindingSetString;
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() is unsupported.");
+ }
}
-}
+ /**
+ * Iterates over rows that have a Binding Set column and returns the unmarshalled
+ * {@link BindingSet}s.
+ */
+ private static final class FluoTableIterator implements Iterator<BindingSet> {
+
+ private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
+ FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
+ FluoQueryColumns.JOIN_BINDING_SET,
+ FluoQueryColumns.FILTER_BINDING_SET);
+
+ private final RowIterator rows;
+ private final VariableOrder varOrder;
+
+ /**
+ * Constructs an instance of {@link FluoTableIterator}.
+ *
+ * @param rows - Iterates over RowId values in a Fluo Table. (not null)
+ * @param varOrder - The Variable Order of binding sets that will be
+ * read from the Fluo Table. (not null)
+ */
+ public FluoTableIterator(RowIterator rows, VariableOrder varOrder) {
+ this.rows = checkNotNull(rows);
+ this.varOrder = checkNotNull(varOrder);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return rows.hasNext();
+ }
+
+ @Override
+ public BindingSet next() {
+ final ColumnIterator columns = rows.next().getValue();
+
+ while(columns.hasNext()) {
+ // If this is one of the BindingSet columns, handle it and return the BindingSet.
+ final Entry<Column, Bytes> entry = columns.next();
+ if(BINDING_SET_COLUMNS.contains(entry.getKey())) {
+ final String bindingSetString = entry.getValue().toString();
+ try {
+ return converter.convert(bindingSetString, varOrder);
+ } catch (BindingSetConversionException e) {
+ throw new RuntimeException("Could not convert one of the stored BindingSets from a String: " + bindingSetString, e);
+ }
+ }
+ }
+
+ throw new RuntimeException("Row did not containing a Binding Set.");
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() is unsupported.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index e4d0155..f3ff089 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -46,6 +46,8 @@ public class QueryResultUpdater {
private final Encoder encoder = new StringEncoder();
+ private final BindingSetStringConverter converter = new BindingSetStringConverter();
+
/**
* Updates the results of a Query node when one of its children has added a
* new Binding Set to its results.
@@ -67,10 +69,12 @@ public class QueryResultUpdater {
final MapBindingSet queryBindingSet = new MapBindingSet();
for(final String bindingName : queryVarOrder) {
- final Binding binding = childBindingSet.getBinding(bindingName);
- queryBindingSet.addBinding(binding);
+ if(childBindingSet.hasBinding(bindingName)) {
+ final Binding binding = childBindingSet.getBinding(bindingName);
+ queryBindingSet.addBinding(binding);
+ }
}
- final String queryBindingSetString = BindingSetStringConverter.toString(queryBindingSet, queryVarOrder);
+ final String queryBindingSetString = converter.convert(queryBindingSet, queryVarOrder);
// Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry.
final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index d3c5bb5..aa944e4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -38,6 +38,7 @@ import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.types.TypedObserver;
import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
/**
* Notified when the results of a node have been updated to include a new Binding
@@ -97,7 +98,11 @@ public abstract class BindingSetUpdater extends TypedObserver {
case JOIN:
final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId);
- joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin);
+ try {
+ joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin);
+ } catch (BindingSetConversionException e) {
+ throw new RuntimeException("Could not process a Join node.", e);
+ }
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index c6b6303..2accde3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
* Notified when the results of a Filter have been updated to include a new
@@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase;
*/
public class FilterObserver extends BindingSetUpdater {
+ private final BindingSetStringConverter converter = new BindingSetStringConverter();
+
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@Override
@@ -53,9 +56,8 @@ public class FilterObserver extends BindingSetUpdater {
final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId);
// Read the Binding Set that was just emmitted by the Filter.
- final String[] filterBindingStrings = parsedRow.getBindingStrings();
- final String[] filterVarOrder = filterMetadata.getVariableOrder().toArray();
- final BindingSet filterBindingSet = FluoStringConverter.toBindingSet(filterBindingStrings, filterVarOrder);
+ final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
+ final BindingSet filterBindingSet = converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
// Figure out which node needs to handle the new metadata.
final String parentNodeId = filterMetadata.getParentNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 980c7bc..43b0a4e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
* Notified when the results of a Join have been updated to include a new
@@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase;
*/
public class JoinObserver extends BindingSetUpdater {
+ private final BindingSetStringConverter converter = new BindingSetStringConverter();
+
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@Override
@@ -52,9 +55,8 @@ public class JoinObserver extends BindingSetUpdater {
final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId);
// Read the Binding Set that was just emmitted by the Join.
- final String[] joinBindingStrings = parsedRow.getBindingStrings();
- final String[] joinVarOrder = joinMetadata.getVariableOrder().toArray();
- final BindingSet joinBindingSet = FluoStringConverter.toBindingSet(joinBindingStrings, joinVarOrder);
+ final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+ final BindingSet joinBindingSet = converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
// Figure out which node needs to handle the new metadata.
final String parentNodeId = joinMetadata.getParentNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 8a60039..7c1a588 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -21,7 +21,6 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
@@ -39,6 +38,8 @@ import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.types.TypedObserver;
import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
* Performs incremental result exporting to the configured destinations.
@@ -48,6 +49,8 @@ public class QueryResultObserver extends TypedObserver {
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+ private final BindingSetStringConverter converter = new BindingSetStringConverter();
+
/**
* Builders for each type of result exporter we support.
*/
@@ -96,10 +99,10 @@ public class QueryResultObserver extends TypedObserver {
// Fetch the query's Variable Order from the Fluo table.
final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, queryId);
- final String[] varOrder = queryMetadata.getVariableOrder().toArray();
+ final VariableOrder varOrder = queryMetadata.getVariableOrder();
// Export the result using each of the provided exporters.
- final BindingSet result = FluoStringConverter.toBindingSet(bindingSetString, varOrder);
+ BindingSet result = converter.convert(bindingSetString, varOrder);
for(final IncrementalResultExporter exporter : exporters) {
try {
exporter.export(tx, queryId, result);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 00f9b13..ddba9a2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -21,13 +21,14 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
+import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
/**
* Notified when the results of a Statement Pattern have been updated to include
@@ -36,6 +37,8 @@ import io.fluo.api.client.TransactionBase;
*/
public class StatementPatternObserver extends BindingSetUpdater {
+ private final BindingSetStringConverter converter = new BindingSetStringConverter();
+
// DAO
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -53,9 +56,8 @@ public class StatementPatternObserver extends BindingSetUpdater {
final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId);
// Read the Binding Set that was just emmitted by the Statement Pattern.
- final String[] spBindingStrings = parsedRow.getBindingStrings();
- final String[] spVarOrder = spMetadata.getVariableOrder().toArray();
- final BindingSet spBindingSet = FluoStringConverter.toBindingSet(spBindingStrings, spVarOrder);
+ final VariableOrder spVarOrder = spMetadata.getVariableOrder();
+ final BindingSet spBindingSet = converter.convert(parsedRow.getBindingSetString(), spVarOrder);
// Figure out which node needs to handle the new metadata.
final String parentNodeId = spMetadata.getParentNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 3d766ed..fa25456 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -54,6 +54,7 @@ import io.fluo.api.data.Column;
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:nodeId</td> <td>The Node ID of the Join.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
+ * <tr> <td>Node ID</td> <td>joinMetadata:joinType</td> <td>The Join algorithm that will be used when computing join results.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
@@ -128,6 +129,7 @@ public class FluoQueryColumns {
// Join Metadata columns.
public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId");
public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder");
+ public static final Column JOIN_TYPE = new Column(JOIN_METADATA_CF, "joinType");
public static final Column JOIN_PARENT_NODE_ID = new Column(JOIN_METADATA_CF, "parentNodeId");
public static final Column JOIN_LEFT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "leftChildNodeId");
public static final Column JOIN_RIGHT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "rightChildNodeId");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 0ab5f18..265ca0f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import com.google.common.collect.Sets;
@@ -170,6 +171,7 @@ public class FluoQueryMetadataDAO {
final Bytes rowId = encoder.encode(metadata.getNodeId());
tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId);
tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() ));
+ tx.set(rowId, FluoQueryColumns.JOIN_TYPE, encoder.encode(metadata.getJoinType().toString()) );
tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() ));
tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, encoder.encode( metadata.getLeftChildNodeId() ));
tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, encoder.encode( metadata.getRightChildNodeId() ));
@@ -194,6 +196,7 @@ public class FluoQueryMetadataDAO {
final Bytes rowId = encoder.encode(nodeId);
final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet(
FluoQueryColumns.JOIN_VARIABLE_ORDER,
+ FluoQueryColumns.JOIN_TYPE,
FluoQueryColumns.JOIN_PARENT_NODE_ID,
FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID,
FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID));
@@ -202,12 +205,16 @@ public class FluoQueryMetadataDAO {
final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER));
final VariableOrder varOrder = new VariableOrder(varOrderString);
+ String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) );
+ JoinType joinType = JoinType.valueOf(joinTypeString);
+
final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) );
final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) );
final String rightChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID) );
return JoinMetadata.builder(nodeId)
.setVariableOrder(varOrder)
+ .setJoinType(joinType)
.setParentNodeId(parentNodeId)
.setLeftChildNodeId(leftChildNodeId)
.setRightChildNodeId(rightChildNodeId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
index 2546972..566ce61 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
@@ -37,6 +37,15 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
@ParametersAreNonnullByDefault
public class JoinMetadata extends CommonNodeMetadata {
+ /**
+ * The different types of Join algorithms that this join may perform.
+ */
+ public static enum JoinType {
+ NATURAL_JOIN,
+ LEFT_OUTER_JOIN;
+ }
+
+ private final JoinType joinType;
private final String parentNodeId;
private final String leftChildNodeId;
private final String rightChildNodeId;
@@ -46,6 +55,7 @@ public class JoinMetadata extends CommonNodeMetadata {
*
* @param nodeId - The ID the Fluo app uses to reference this node. (not null)
* @param varOrder - The variable order of binding sets that are emitted by this node. (not null)
+ * @param joinType - Defines which join algorithm the join will use.
* @param parentNodeId - The node id of this node's parent. (not null)
* @param leftChildNodeId - One of the nodes whose results are being joined. (not null)
* @param rightChildNodeId - The other node whose results are being joined. (not null)
@@ -53,16 +63,25 @@ public class JoinMetadata extends CommonNodeMetadata {
public JoinMetadata(
final String nodeId,
final VariableOrder varOrder,
+ final JoinType joinType,
final String parentNodeId,
final String leftChildNodeId,
final String rightChildNodeId) {
super(nodeId, varOrder);
+ this.joinType = checkNotNull(joinType);
this.parentNodeId = checkNotNull(parentNodeId);
this.leftChildNodeId = checkNotNull(leftChildNodeId);
this.rightChildNodeId = checkNotNull(rightChildNodeId);
}
/**
+ * @return Defines which join algorithm the join will use.
+ */
+ public JoinType getJoinType() {
+ return joinType;
+ }
+
+ /**
* @return The node id of this node's parent.
*/
public String getParentNodeId() {
@@ -88,6 +107,7 @@ public class JoinMetadata extends CommonNodeMetadata {
return Objects.hashCode(
super.getNodeId(),
super.getVariableOrder(),
+ joinType,
parentNodeId,
leftChildNodeId,
rightChildNodeId);
@@ -103,6 +123,7 @@ public class JoinMetadata extends CommonNodeMetadata {
if(super.equals(o)) {
final JoinMetadata joinMetadata = (JoinMetadata)o;
return new EqualsBuilder()
+ .append(joinType, joinMetadata.joinType)
.append(parentNodeId, joinMetadata.parentNodeId)
.append(leftChildNodeId, joinMetadata.leftChildNodeId)
.append(rightChildNodeId, joinMetadata.rightChildNodeId)
@@ -120,6 +141,7 @@ public class JoinMetadata extends CommonNodeMetadata {
.append("Join Metadata {\n")
.append(" Node ID: " + super.getNodeId() + "\n")
.append(" Variable Order: " + super.getVariableOrder() + "\n")
+ .append(" Join Type: " + joinType + "\n")
.append(" Parent Node ID: " + parentNodeId + "\n")
.append(" Left Child Node ID: " + leftChildNodeId + "\n")
.append(" Right Child Node ID: " + rightChildNodeId + "\n")
@@ -145,6 +167,7 @@ public class JoinMetadata extends CommonNodeMetadata {
private final String nodeId;
private VariableOrder varOrder;
+ private JoinType joinType;
private String parentNodeId;
private String leftChildNodeId;
private String rightChildNodeId;
@@ -188,6 +211,17 @@ public class JoinMetadata extends CommonNodeMetadata {
}
/**
+ * Sets the type of join algorithm that will be used by this join.
+ *
+ * @param joinType - Defines which join algorithm the join will use.
+ * @return This builder so that method invocation could be chained.
+ */
+ public Builder setJoinType(@Nullable final JoinType joinType) {
+ this.joinType = joinType;
+ return this;
+ }
+
+ /**
* Set one of the nodes whose results are being joined.
*
* @param leftChildNodeId - One of the nodes whose results are being joined.
@@ -216,6 +250,7 @@ public class JoinMetadata extends CommonNodeMetadata {
return new JoinMetadata(
nodeId,
varOrder,
+ joinType,
parentNodeId,
leftChildNodeId,
rightChildNodeId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 844a7a4..25b4e4f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,7 +19,6 @@
package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter.toVarOrderString;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -39,8 +38,10 @@ import javax.annotation.concurrent.Immutable;
import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.openrdf.query.algebra.Filter;
import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.LeftJoin;
import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
@@ -152,7 +153,7 @@ public class SparqlFluoQueryBuilder {
prefix = SP_PREFIX;
} else if(node instanceof Filter) {
prefix = FILTER_PREFIX;
- } else if(node instanceof Join) {
+ } else if(node instanceof Join || node instanceof LeftJoin) {
prefix = JOIN_PREFIX;
} else if(node instanceof Projection) {
prefix = QUERY_PREFIX;
@@ -218,18 +219,36 @@ public class SparqlFluoQueryBuilder {
}
@Override
+ public void meet(LeftJoin node) {
+ // Extract the metadata that will be stored for the node.
+ String leftJoinNodeId = nodeIds.getOrMakeId(node);
+ final QueryModelNode left = node.getLeftArg();
+ final QueryModelNode right = node.getRightArg();
+
+ // Update the metadata for the JoinMetadata.Builder.
+ makeJoinMetadata(leftJoinNodeId, JoinType.LEFT_OUTER_JOIN, left, right);
+
+ // Walk to the next node.
+ super.meet(node);
+ }
+
+ @Override
public void meet(final Join node) {
// Extract the metadata that will be stored from the node.
final String joinNodeId = nodeIds.getOrMakeId(node);
final QueryModelNode left = node.getLeftArg();
final QueryModelNode right = node.getRightArg();
- if(left == null || right == null) {
- throw new IllegalArgumentException("Join args connot be null.");
- }
+ // Update the metadata for the JoinMetadata.Builder.
+ makeJoinMetadata(joinNodeId, JoinType.NATURAL_JOIN, left, right);
+
+ // Walk to the next node.
+ super.meet(node);
+ }
+ private void makeJoinMetadata(String joinNodeId, JoinType joinType, QueryModelNode left, QueryModelNode right) {
final String leftChildNodeId = nodeIds.getOrMakeId(left);
- final String rightChildNodeId = nodeIds.getOrMakeId( right );
+ final String rightChildNodeId = nodeIds.getOrMakeId(right);
// Get or create a builder for this node populated with the known metadata.
JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(joinNodeId).orNull();
@@ -237,6 +256,7 @@ public class SparqlFluoQueryBuilder {
joinBuilder = JoinMetadata.builder(joinNodeId);
fluoQueryBuilder.addJoinMetadata(joinBuilder);
}
+ joinBuilder.setJoinType(joinType);
joinBuilder.setLeftChildNodeId( leftChildNodeId );
joinBuilder.setRightChildNodeId( rightChildNodeId );
@@ -247,15 +267,12 @@ public class SparqlFluoQueryBuilder {
final JoinVarOrders varOrders = getJoinArgVarOrders(leftVars, rightVars);
// Create or update the left child's variable order and parent node id.
- final VariableOrder leftVarOrder = new VariableOrder( varOrders.getLeftVarOrder() );
+ final VariableOrder leftVarOrder = varOrders.getLeftVarOrder();
setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId);
// Create or update the right child's variable order and parent node id.
- final VariableOrder rightVarOrder = new VariableOrder( varOrders.getRightVarOrder() );
+ final VariableOrder rightVarOrder = varOrders.getRightVarOrder();
setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId);
-
- // Walk to the next node.
- super.meet(node);
}
@Override
@@ -282,7 +299,7 @@ public class SparqlFluoQueryBuilder {
// Update the child node's metadata.
final Set<String> childVars = getVars((TupleExpr)child);
- final VariableOrder childVarOrder = new VariableOrder( toVarOrderString(childVars) );
+ final VariableOrder childVarOrder = new VariableOrder(childVars);
setChildMetadata(childNodeId, childVarOrder, filterId);
// Walk to the next node.
@@ -293,7 +310,7 @@ public class SparqlFluoQueryBuilder {
public void meet(final Projection node) {
// Create a builder for this node populated with the metadata.
final String queryId = nodeIds.getOrMakeId(node);
- final VariableOrder queryVarOrder = new VariableOrder( toVarOrderString( node.getAssuredBindingNames() ) );
+ final VariableOrder queryVarOrder = new VariableOrder(node.getBindingNames());
final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
fluoQueryBuilder.setQueryMetadata(queryBuilder);
@@ -311,7 +328,7 @@ public class SparqlFluoQueryBuilder {
// Update the child node's metadata.
final Set<String> childVars = getVars((TupleExpr)child);
- final VariableOrder childVarOrder = new VariableOrder( toVarOrderString(childVars) );
+ final VariableOrder childVarOrder = new VariableOrder(childVars);
setChildMetadata(childNodeId, childVarOrder, queryId);
@@ -386,10 +403,9 @@ public class SparqlFluoQueryBuilder {
final Set<String> vars = Sets.newHashSet();
- final Set<String> abn = node.getAssuredBindingNames();
- for(final String s: abn) {
- if(!s.startsWith("-const-")) {
- vars.add(s);
+ for(final String bindingName : node.getBindingNames()) {
+ if(!bindingName.startsWith("-const-")) {
+ vars.add(bindingName);
}
}
@@ -402,8 +418,8 @@ public class SparqlFluoQueryBuilder {
@Immutable
@ParametersAreNonnullByDefault
private static final class JoinVarOrders {
- private final String leftVarOrder;
- private final String rightVarOrder;
+ private final VariableOrder leftVarOrder;
+ private final VariableOrder rightVarOrder;
/**
* Constructs an instance of {@link }.
@@ -411,7 +427,7 @@ public class SparqlFluoQueryBuilder {
* @param leftVarOrder - The left child's Variable Order. (not null)
* @param rightVarOrder - The right child's Variable Order. (not null)
*/
- public JoinVarOrders(final String leftVarOrder, final String rightVarOrder) {
+ public JoinVarOrders(final VariableOrder leftVarOrder, final VariableOrder rightVarOrder) {
this.leftVarOrder = checkNotNull(leftVarOrder);
this.rightVarOrder = checkNotNull(rightVarOrder);
}
@@ -419,14 +435,14 @@ public class SparqlFluoQueryBuilder {
/**
* @return The left child's Variable Order.
*/
- public String getLeftVarOrder() {
+ public VariableOrder getLeftVarOrder() {
return leftVarOrder;
}
/**
* @return The right child's Variable Order.
*/
- public String getRightVarOrder() {
+ public VariableOrder getRightVarOrder() {
return rightVarOrder;
}
}
@@ -450,7 +466,7 @@ public class SparqlFluoQueryBuilder {
// Push all of the common variables to the left for each child's vars.
final List<String> leftVarOrder = leftShiftCommonVars(commonVars, leftVars);
final List<String> rightVarOrder = leftShiftCommonVars(commonVars, rightVars);
- return new JoinVarOrders(toVarOrderString(leftVarOrder), toVarOrderString(rightVarOrder));
+ return new JoinVarOrders(new VariableOrder(leftVarOrder), new VariableOrder(rightVarOrder));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
index a672bd2..4ad5189 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
@@ -19,94 +19,20 @@
package org.apache.rya.indexing.pcj.fluo.app;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collection;
import org.junit.Test;
import org.openrdf.model.impl.LiteralImpl;
import org.openrdf.model.impl.URIImpl;
import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.Var;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.beust.jcommander.internal.Lists;
/**
* Tests the methods of {@link FluoStringConverterTest}.
*/
public class FluoStringConverterTest {
- @Test
- public void varOrderToString() {
- // Setup the variable order that will be converted.
- final Collection<String> varOrder = Lists.newArrayList("x", "y", "z");
-
- // Convert it to a String.
- final String varOrderString = FluoStringConverter.toVarOrderString(varOrder);
-
- // Ensure it converted to the expected result.
- final String expected = "x;y;z";
- assertEquals(expected, varOrderString);
- }
-
- @Test
- public void stringToVarOrder() {
- // Setup the String that will be converted.
- final String varOrderString = "x;y;z";
-
- // Convert it to an array in variable order.
- final String[] varOrder = FluoStringConverter.toVarOrder(varOrderString);
-
- // Ensure it converted to the expected result.
- final String[] expected = {"x", "y", "z"};
- assertTrue( Arrays.equals(expected, varOrder) );
- }
-
- @Test
- public void bindingSetToString() {
- // Setup the binding set that will be converted.
- final MapBindingSet originalBindingSet = new MapBindingSet();
- originalBindingSet.addBinding("x", new URIImpl("http://a"));
- originalBindingSet.addBinding("y", new URIImpl("http://b"));
- originalBindingSet.addBinding("z", new URIImpl("http://c"));
-
- // Convert it to a String.
- final String[] varOrder = new String[] {"y", "z", "x" };
- final String bindingSetString = FluoStringConverter.toBindingSetString(originalBindingSet, varOrder);
-
- // Ensure it converted to the expected result.
- final String expected = "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
- "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
- "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI";
-
- assertEquals(expected, bindingSetString);
- }
-
- @Test
- public void stringToBindingSet() {
- // Setup the String that will be converted.
- final String bindingSetString = "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
- "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" +
- "http://a<<~>>http://www.w3.org/2001/XMLSchema#anyURI";
-
- // Convert it to a BindingSet
- final String[] varOrder = new String[] {"y", "z", "x" };
- final BindingSet bindingSet = FluoStringConverter.toBindingSet(bindingSetString, varOrder);
-
- // Ensure it converted to the expected result.
- final MapBindingSet expected = new MapBindingSet();
- expected.addBinding("x", new URIImpl("http://a"));
- expected.addBinding("y", new URIImpl("http://b"));
- expected.addBinding("z", new URIImpl("http://c"));
-
- assertEquals(expected, bindingSet);
- }
-
@Test
public void statementPatternToString() throws MalformedQueryException {
// Setup a StatementPattern that represents "?x <http://worksAt> <http://Chipotle>."
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a24515b0/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
new file mode 100644
index 0000000..025c3e7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Tests the methods of {@link LeftOuterJoin}.
+ */
+public class LeftOuterJoinTest {
+
+ private final ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void newLeftResult_noRightMatches() {
+ IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+ // There is a new left result.
+ MapBindingSet newLeftResult = new MapBindingSet();
+ newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+
+ // There are no right results that join with the left result.
+ Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator();
+
+ // Therefore, the left result is a new join result.
+ Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+
+ Set<BindingSet> newJoinResults = new HashSet<>();
+ while(newJoinResultsIt.hasNext()) {
+ newJoinResults.add( newJoinResultsIt.next() );
+ }
+
+ Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult );
+
+ assertEquals(expected, newJoinResults);
+ }
+
+ @Test
+ public void newLeftResult_joinsWithRightResults() {
+ IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+ // There is a new left result.
+ MapBindingSet newLeftResult = new MapBindingSet();
+ newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+ newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+
+ // There are a few right results that join with the left result.
+ MapBindingSet nameAge = new MapBindingSet();
+ nameAge.addBinding("name", vf.createLiteral("Bob"));
+ nameAge.addBinding("age", vf.createLiteral(56));
+
+ MapBindingSet nameHair = new MapBindingSet();
+ nameHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+ Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+ // Therefore, there are a few new join results that mix the two together.
+ Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+
+ Set<BindingSet> newJoinResults = new HashSet<>();
+ while(newJoinResultsIt.hasNext()) {
+ newJoinResults.add( newJoinResultsIt.next() );
+ }
+
+ Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ MapBindingSet nameHeightAge = new MapBindingSet();
+ nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightAge.addBinding("age", vf.createLiteral(56));
+ expected.add(nameHeightAge);
+
+ MapBindingSet nameHeightHair = new MapBindingSet();
+ nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+ expected.add(nameHeightHair);
+
+ assertEquals(expected, newJoinResults);
+ }
+
+ @Test
+ public void newRightResult_noLeftMatches() {
+ IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+ // There are no left results.
+ Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator();
+
+ // There is a new right result.
+ MapBindingSet newRightResult = new MapBindingSet();
+ newRightResult.addBinding("name", vf.createLiteral("Bob"));
+
+ // Therefore, there are no new join results.
+ Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult);
+ assertFalse( newJoinResultsIt.hasNext() );
+ }
+
+ @Test
+ public void newRightResult_joinsWithLeftResults() {
+ IterativeJoin leftOuterJoin = new LeftOuterJoin();
+
+ // There are a few left results that join with the new right result.
+ MapBindingSet nameAge = new MapBindingSet();
+ nameAge.addBinding("name", vf.createLiteral("Bob"));
+ nameAge.addBinding("age", vf.createLiteral(56));
+
+ MapBindingSet nameHair = new MapBindingSet();
+ nameHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+
+ Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+
+ // There is a new right result.
+ MapBindingSet newRightResult = new MapBindingSet();
+ newRightResult.addBinding("name", vf.createLiteral("Bob"));
+ newRightResult.addBinding("height", vf.createLiteral("5'9\""));
+
+ // Therefore, there are a few new join results that mix the two together.
+ Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult);
+
+ Set<BindingSet> newJoinResults = new HashSet<>();
+ while(newJoinResultsIt.hasNext()) {
+ newJoinResults.add( newJoinResultsIt.next() );
+ }
+
+ Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ MapBindingSet nameHeightAge = new MapBindingSet();
+ nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightAge.addBinding("age", vf.createLiteral(56));
+ expected.add(nameHeightAge);
+
+ MapBindingSet nameHeightHair = new MapBindingSet();
+ nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
+ nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
+ nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
+ expected.add(nameHeightHair);
+
+ assertEquals(expected, newJoinResults);
+ }
+}
\ No newline at end of file