You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/12 21:25:31 UTC
[3/8] incubator-rya git commit: RYA-303 Mongo PCJ Support. Closes
#172.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java
new file mode 100644
index 0000000..1503b53
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoITBase;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+public class MongoPCJIndexIT extends MongoITBase {
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+ @Override
+ protected void updateConfiguration(final MongoDBRdfConfiguration conf) {
+ conf.setBoolean(ConfigUtils.USE_MONGO, true);
+ conf.setBoolean(ConfigUtils.USE_PCJ, false);
+ }
+
+ @Test
+ public void sparqlQuery_Test() throws Exception {
+ // Setup a Rya Client.
+ final MongoConnectionDetails connectionDetails = getConnectionDetails();
+ final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, getMongoClient());
+ final String pcjQuery = "SELECT ?name WHERE {"
+ + " ?name <urn:likes> <urn:icecream> ."
+ + " ?name <urn:hasEyeColor> <urn:blue> ."
+ + " }";
+
+ // Install an instance of Rya and load statements.
+ ryaClient.getInstall().install(conf.getRyaInstanceName(), InstallConfiguration.builder()
+ .setEnablePcjIndex(true)
+ .build());
+ ryaClient.getLoadStatements().loadStatements(conf.getRyaInstanceName(), getStatements());
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(conf.getRyaInstanceName(), pcjQuery);
+ ryaClient.getBatchUpdatePCJ().batchUpdate(conf.getRyaInstanceName(), pcjId);
+
+ //purge contents of rya triples collection
+ getMongoClient().getDatabase(conf.getRyaInstanceName()).getCollection(conf.getTriplesCollectionName()).drop();
+
+ //run the query. since the triples collection is gone, if the results match, they came from the PCJ index.
+ conf.setBoolean(ConfigUtils.USE_PCJ, true);
+ conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, true);
+ conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, true);
+ final Sail sail = RyaSailFactory.getInstance(conf);
+ SailRepositoryConnection conn = new SailRepository(sail).getConnection();
+ conn.begin();
+ final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, pcjQuery);
+ tupleQuery.setBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG, RdfCloudTripleStoreConstants.VALUE_FACTORY.createLiteral(true));
+ final TupleQueryResult rez = tupleQuery.evaluate();
+ final Set<BindingSet> results = new HashSet<>();
+ while(rez.hasNext()) {
+ final BindingSet bs = rez.next();
+ results.add(bs);
+ }
+
+ // Verify the correct results were loaded into the PCJ table.
+ final Set<BindingSet> expectedResults = new HashSet<>();
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Alice"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Bob"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Charlie"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:David"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Eve"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Frank"));
+ expectedResults.add(bs);
+
+ assertEquals(6, results.size());
+ assertEquals(expectedResults, results);
+ }
+
+ @Test
+ public void sparqlQuery_Test_complex() throws Exception {
+ // Setup a Rya Client.
+ final MongoConnectionDetails connectionDetails = getConnectionDetails();
+ final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, getMongoClient());
+ final String pcjQuery = "SELECT ?name WHERE {"
+ + " ?name <urn:likes> <urn:icecream> ."
+ + " ?name <urn:hasEyeColor> <urn:blue> ."
+ + " }";
+
+ final String testQuery =
+ "SELECT ?name WHERE {"
+ + " ?name <urn:hasHairColor> <urn:brown> ."
+ + " ?name <urn:likes> <urn:icecream> ."
+ + " ?name <urn:hasEyeColor> <urn:blue> ."
+ + " }";
+
+ // Install an instance of Rya and load statements.
+ conf.setBoolean(ConfigUtils.USE_PCJ, true);
+ conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, true);
+ conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, true);
+ ryaClient.getInstall().install(conf.getRyaInstanceName(), InstallConfiguration.builder()
+ .setEnablePcjIndex(true)
+ .build());
+ ryaClient.getLoadStatements().loadStatements(conf.getRyaInstanceName(), getStatements());
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(conf.getRyaInstanceName(), pcjQuery);
+ ryaClient.getBatchUpdatePCJ().batchUpdate(conf.getRyaInstanceName(), pcjId);
+
+ System.out.println("Triples: " + getMongoClient().getDatabase(conf.getRyaInstanceName()).getCollection(conf.getTriplesCollectionName()).count());
+ System.out.println("PCJS: " + getMongoClient().getDatabase(conf.getRyaInstanceName()).getCollection("pcjs").count());
+
+ //run the query. since the triples collection is gone, if the results match, they came from the PCJ index.
+ final Sail sail = RyaSailFactory.getInstance(conf);
+ SailRepositoryConnection conn = new SailRepository(sail).getConnection();
+ conn.begin();
+ final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, testQuery);
+ tupleQuery.setBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG, RdfCloudTripleStoreConstants.VALUE_FACTORY.createLiteral(true));
+ final TupleQueryResult rez = tupleQuery.evaluate();
+
+ final Set<BindingSet> results = new HashSet<>();
+ while(rez.hasNext()) {
+ final BindingSet bs = rez.next();
+ results.add(bs);
+ }
+
+ // Verify the correct results were loaded into the PCJ table.
+ final Set<BindingSet> expectedResults = new HashSet<>();
+
+ MapBindingSet bs = new MapBindingSet();
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:David"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Eve"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", VF.createURI("urn:Frank"));
+ expectedResults.add(bs);
+
+ assertEquals(3, results.size());
+ assertEquals(expectedResults, results);
+ }
+
+ private MongoConnectionDetails getConnectionDetails() {
+ final java.util.Optional<char[]> password = conf.getMongoPassword() != null ?
+ java.util.Optional.of(conf.getMongoPassword().toCharArray()) :
+ java.util.Optional.empty();
+
+ return new MongoConnectionDetails(
+ conf.getMongoHostname(),
+ Integer.parseInt(conf.getMongoPort()),
+ java.util.Optional.ofNullable(conf.getMongoUser()),
+ password);
+ }
+
+ private Set<Statement> getStatements() throws Exception {
+ final Set<Statement> statements = new HashSet<>();
+ statements.add(VF.createStatement(VF.createURI("urn:Alice"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:Bob"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:Charlie"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:David"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:Eve"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:Frank"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:George"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+ statements.add(VF.createStatement(VF.createURI("urn:Hillary"), VF.createURI("urn:likes"), VF.createURI("urn:icecream")));
+
+ statements.add(VF.createStatement(VF.createURI("urn:Alice"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:Bob"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:Charlie"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:David"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:Eve"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:Frank"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:George"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:green")));
+ statements.add(VF.createStatement(VF.createURI("urn:Hillary"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:brown")));
+
+ statements.add(VF.createStatement(VF.createURI("urn:Alice"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:Bob"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:Charlie"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blue")));
+ statements.add(VF.createStatement(VF.createURI("urn:David"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:brown")));
+ statements.add(VF.createStatement(VF.createURI("urn:Eve"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:brown")));
+ statements.add(VF.createStatement(VF.createURI("urn:Frank"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:brown")));
+ statements.add(VF.createStatement(VF.createURI("urn:George"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blonde")));
+ statements.add(VF.createStatement(VF.createURI("urn:Hillary"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blonde")));
+ return statements;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java
new file mode 100644
index 0000000..af81cf6
--- /dev/null
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.IndexPlanValidator.IndexPlanValidator;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PcjIntegrationTestingUtil;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider;
+import org.apache.rya.indexing.mongodb.pcj.MongoPcjQueryNode;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizer;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoITBase;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.QueryResultHandlerException;
+import org.openrdf.query.TupleQueryResultHandler;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+import com.google.common.collect.Lists;
+
+public class MongoPcjIntegrationTest extends MongoITBase {
+ private static final URI talksTo = new URIImpl("uri:talksTo");
+ private static final URI sub = new URIImpl("uri:entity");
+ private static final URI sub2 = new URIImpl("uri:entity2");
+ private static final URI subclass = new URIImpl("uri:class");
+ private static final URI subclass2 = new URIImpl("uri:class2");
+ private static final URI obj = new URIImpl("uri:obj");
+ private static final URI obj2 = new URIImpl("uri:obj2");
+
+ private void addPCJS(final SailRepositoryConnection conn) throws Exception {
+ conn.add(sub, RDF.TYPE, subclass);
+ conn.add(sub, RDFS.LABEL, new LiteralImpl("label"));
+ conn.add(sub, talksTo, obj);
+
+ conn.add(sub2, RDF.TYPE, subclass2);
+ conn.add(sub2, RDFS.LABEL, new LiteralImpl("label2"));
+ conn.add(sub2, talksTo, obj2);
+ }
+
+ @Override
+ protected void updateConfiguration(final MongoDBRdfConfiguration conf) {
+ conf.set(PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.MONGO.name());
+ conf.set(PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.name());
+ }
+
+ @Test
+ public void testEvaluateSingleIndex() throws Exception {
+ final Sail nonPcjSail = RyaSailFactory.getInstance(conf);
+ final MongoDBRdfConfiguration pcjConf = conf.clone();
+ pcjConf.setBoolean(ConfigUtils.USE_PCJ, true);
+ final Sail pcjSail = RyaSailFactory.getInstance(pcjConf);
+ final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection();
+ final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection();
+ addPCJS(pcjConn);
+ try {
+ final String indexSparqlString = ""//
+ + "SELECT ?e ?l ?c " //
+ + "{" //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
+ + "}";//
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString);
+
+ final String queryString = ""//
+ + "SELECT ?e ?c ?l ?o " //
+ + "{" //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+ + " ?e <uri:talksTo> ?o . "//
+ + "}";//
+
+ final CountingResultHandler crh1 = new CountingResultHandler();
+ final CountingResultHandler crh2 = new CountingResultHandler();
+
+ conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(crh1);
+ pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(crh2);
+
+ assertEquals(crh1.getCount(), crh2.getCount());
+ } finally {
+ conn.close();
+ pcjConn.close();
+ nonPcjSail.shutDown();
+ pcjSail.shutDown();
+ }
+ }
+
+ @Test
+ public void testEvaluateOneIndex() throws Exception {
+ final Sail nonPcjSail = RyaSailFactory.getInstance(conf);
+ final MongoDBRdfConfiguration pcjConf = conf.clone();
+ pcjConf.setBoolean(ConfigUtils.USE_PCJ, true);
+ final Sail pcjSail = RyaSailFactory.getInstance(pcjConf);
+ final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection();
+ final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection();
+ addPCJS(pcjConn);
+ try {
+ final URI superclass = new URIImpl("uri:superclass");
+ final URI superclass2 = new URIImpl("uri:superclass2");
+
+ conn.add(subclass, RDF.TYPE, superclass);
+ conn.add(subclass2, RDF.TYPE, superclass2);
+ conn.add(obj, RDFS.LABEL, new LiteralImpl("label"));
+ conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2"));
+
+ final String indexSparqlString = ""//
+ + "SELECT ?dog ?pig ?duck " //
+ + "{" //
+ + " ?pig a ?dog . "//
+ + " ?pig <http://www.w3.org/2000/01/rdf-schema#label> ?duck "//
+ + "}";//
+
+ final CountingResultHandler crh1 = new CountingResultHandler();
+ final CountingResultHandler crh2 = new CountingResultHandler();
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString);
+
+ conn.prepareTupleQuery(QueryLanguage.SPARQL, indexSparqlString).evaluate(crh1);
+ PcjIntegrationTestingUtil.deleteCoreRyaTables(getMongoClient(), conf.getRyaInstanceName(), conf.getTriplesCollectionName());
+ pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, indexSparqlString).evaluate(crh2);
+
+ assertEquals(crh1.count, crh2.count);
+ } finally {
+ conn.close();
+ pcjConn.close();
+ nonPcjSail.shutDown();
+ pcjSail.shutDown();
+ }
+ }
+
+ @Test
+ public void testEvaluateTwoIndexValidate() throws Exception {
+ final Sail nonPcjSail = RyaSailFactory.getInstance(conf);
+ final MongoDBRdfConfiguration pcjConf = conf.clone();
+ pcjConf.setBoolean(ConfigUtils.USE_PCJ, true);
+ final Sail pcjSail = RyaSailFactory.getInstance(pcjConf);
+ final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection();
+ final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection();
+ addPCJS(pcjConn);
+ try {
+ final URI superclass = new URIImpl("uri:superclass");
+ final URI superclass2 = new URIImpl("uri:superclass2");
+
+ conn.add(subclass, RDF.TYPE, superclass);
+ conn.add(subclass2, RDF.TYPE, superclass2);
+ conn.add(obj, RDFS.LABEL, new LiteralImpl("label"));
+ conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2"));
+
+ final String indexSparqlString = ""//
+ + "SELECT ?dog ?pig ?duck " //
+ + "{" //
+ + " ?pig a ?dog . "//
+ + " ?pig <http://www.w3.org/2000/01/rdf-schema#label> ?duck "//
+ + "}";//
+
+ final String indexSparqlString2 = ""//
+ + "SELECT ?o ?f ?e ?c ?l " //
+ + "{" //
+ + " ?e <uri:talksTo> ?o . "//
+ + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "//
+ + " ?c a ?f . " //
+ + "}";//
+
+ final String queryString = ""//
+ + "SELECT ?e ?c ?l ?f ?o " //
+ + "{" //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l. "//
+ + " ?e <uri:talksTo> ?o . "//
+ + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "//
+ + " ?c a ?f . " //
+ + "}";//
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString);
+ final MongoPcjQueryNode ais1 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 1);
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 2, conf.getRyaInstanceName(), indexSparqlString2);
+ final MongoPcjQueryNode ais2 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 2);
+
+ final List<ExternalTupleSet> index = new ArrayList<>();
+ index.add(ais1);
+ index.add(ais2);
+
+ ParsedQuery pq = null;
+ final SPARQLParser sp = new SPARQLParser();
+ pq = sp.parseQuery(queryString, null);
+ final List<TupleExpr> teList = Lists.newArrayList();
+ final TupleExpr te = pq.getTupleExpr();
+
+ final PCJOptimizer pcj = new PCJOptimizer(index, false, new MongoPcjIndexSetProvider(new StatefulMongoDBRdfConfiguration(conf, getMongoClient())));
+ pcj.optimize(te, null, null);
+ teList.add(te);
+
+ final IndexPlanValidator ipv = new IndexPlanValidator(false);
+
+ assertTrue(ipv.isValid(te));
+ } finally {
+ conn.close();
+ pcjConn.close();
+ nonPcjSail.shutDown();
+ pcjSail.shutDown();
+ }
+ }
+
+ @Test
+ public void testEvaluateThreeIndexValidate() throws Exception {
+ final Sail nonPcjSail = RyaSailFactory.getInstance(conf);
+ final MongoDBRdfConfiguration pcjConf = conf.clone();
+ pcjConf.setBoolean(ConfigUtils.USE_PCJ, true);
+ final Sail pcjSail = RyaSailFactory.getInstance(pcjConf);
+ final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection();
+ final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection();
+ addPCJS(pcjConn);
+ try {
+ final URI superclass = new URIImpl("uri:superclass");
+ final URI superclass2 = new URIImpl("uri:superclass2");
+
+ final URI howlsAt = new URIImpl("uri:howlsAt");
+ final URI subType = new URIImpl("uri:subType");
+ final URI superSuperclass = new URIImpl("uri:super_superclass");
+
+ conn.add(subclass, RDF.TYPE, superclass);
+ conn.add(subclass2, RDF.TYPE, superclass2);
+ conn.add(obj, RDFS.LABEL, new LiteralImpl("label"));
+ conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2"));
+ conn.add(sub, howlsAt, superclass);
+ conn.add(superclass, subType, superSuperclass);
+
+ final String indexSparqlString = ""//
+ + "SELECT ?dog ?pig ?duck " //
+ + "{" //
+ + " ?pig a ?dog . "//
+ + " ?pig <http://www.w3.org/2000/01/rdf-schema#label> ?duck "//
+ + "}";//
+
+ final String indexSparqlString2 = ""//
+ + "SELECT ?o ?f ?e ?c ?l " //
+ + "{" //
+ + " ?e <uri:talksTo> ?o . "//
+ + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "//
+ + " ?c a ?f . " //
+ + "}";//
+
+ final String indexSparqlString3 = ""//
+ + "SELECT ?wolf ?sheep ?chicken " //
+ + "{" //
+ + " ?wolf <uri:howlsAt> ?sheep . "//
+ + " ?sheep <uri:subType> ?chicken. "//
+ + "}";//
+
+ final String queryString = ""//
+ + "SELECT ?e ?c ?l ?f ?o " //
+ + "{" //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l. "//
+ + " ?e <uri:talksTo> ?o . "//
+ + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "//
+ + " ?c a ?f . " //
+ + " ?e <uri:howlsAt> ?f. "//
+ + " ?f <uri:subType> ?o. "//
+ + "}";//
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString);
+ final MongoPcjQueryNode ais1 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 1);
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 2, conf.getRyaInstanceName(), indexSparqlString2);
+ final MongoPcjQueryNode ais2 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 2);
+
+ PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 3, conf.getRyaInstanceName(), indexSparqlString3);
+ final MongoPcjQueryNode ais3 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 3);
+
+ final List<ExternalTupleSet> index = new ArrayList<>();
+ index.add(ais1);
+ index.add(ais3);
+ index.add(ais2);
+
+ ParsedQuery pq = null;
+ final SPARQLParser sp = new SPARQLParser();
+ pq = sp.parseQuery(queryString, null);
+ final List<TupleExpr> teList = Lists.newArrayList();
+ final TupleExpr te = pq.getTupleExpr();
+
+ final PCJOptimizer pcj = new PCJOptimizer(index, false, new MongoPcjIndexSetProvider(new StatefulMongoDBRdfConfiguration(conf, getMongoClient())));
+ pcj.optimize(te, null, null);
+
+ teList.add(te);
+
+ final IndexPlanValidator ipv = new IndexPlanValidator(false);
+
+ assertTrue(ipv.isValid(te));
+ } finally {
+ conn.close();
+ pcjConn.close();
+ nonPcjSail.shutDown();
+ pcjSail.shutDown();
+ }
+ }
+
+ public static class CountingResultHandler implements TupleQueryResultHandler {
+ private int count = 0;
+
+ public int getCount() {
+ return count;
+ }
+
+ public void resetCount() {
+ count = 0;
+ }
+
+ @Override
+ public void startQueryResult(final List<String> arg0) throws TupleQueryResultHandlerException {
+ }
+
+ @Override
+ public void handleSolution(final BindingSet arg0) throws TupleQueryResultHandlerException {
+ count++;
+ System.out.println(arg0);
+ }
+
+ @Override
+ public void endQueryResult() throws TupleQueryResultHandlerException {
+ }
+
+ @Override
+ public void handleBoolean(final boolean arg0) throws QueryResultHandlerException {
+
+ }
+
+ @Override
+ public void handleLinks(final List<String> arg0) throws QueryResultHandlerException {
+
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java
index 31f4e7b..d28d826 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java
@@ -19,17 +19,24 @@ package org.apache.rya.indexing.pcj.matching;
* under the License.
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.apache.rya.indexing.external.tupleSet.SimpleExternalTupleSet;
-
+import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider;
+import org.apache.rya.mongodb.EmbeddedMongoSingleton;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
import org.junit.Assert;
import org.junit.Test;
-import org.openrdf.query.MalformedQueryException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
@@ -38,14 +45,30 @@ import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+@RunWith(Parameterized.class)
public class PCJOptimizerTest {
+ private final AbstractPcjIndexSetProvider provider;
+
+ @Parameterized.Parameters
+ public static Collection providers() throws Exception {
+ final StatefulMongoDBRdfConfiguration conf = new StatefulMongoDBRdfConfiguration(new Configuration(), EmbeddedMongoSingleton.getNewMongoClient());
+ return Lists.<AbstractPcjIndexSetProvider> newArrayList(
+ new AccumuloIndexSetProvider(new Configuration()),
+ new MongoPcjIndexSetProvider(conf)
+ );
+ }
+
+ public PCJOptimizerTest(final AbstractPcjIndexSetProvider provider) {
+ this.provider = provider;
+ }
@Test
public void testBasicSegment() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " ?e a ?c . "//
@@ -53,29 +76,30 @@ public class PCJOptimizerTest {
+ " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?m" //
+ "{" //
+ " ?a a ?b . "//
+ " OPTIONAL {?a <uri:talksTo> ?m} . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr unOpt = te1.clone();
- List<QueryModelNode> remainingNodes = getNodes(te1);
- Set<QueryModelNode> unMatchedNodes = new HashSet<>();
+ final TupleExpr unOpt = te1.clone();
+ final List<QueryModelNode> remainingNodes = getNodes(te1);
+ final Set<QueryModelNode> unMatchedNodes = new HashSet<>();
unMatchedNodes.add(remainingNodes.get(2));
- SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes));
@@ -86,7 +110,7 @@ public class PCJOptimizerTest {
@Test
public void testSegmentWithUnion() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " {?e <uri:p1> <uri:o1>. } UNION { ?e a ?c. OPTIONAL {?e <uri:talksTo> ?l}. ?e <uri:p5> <uri:o4>. ?e <uri:p4> <uri:o3> } . "//
@@ -94,7 +118,7 @@ public class PCJOptimizerTest {
+ " ?e <uri:p3> <uri:o2> . "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?m" //
+ "{" //
+ " ?a <uri:p5> <uri:o4> ." //
@@ -103,33 +127,34 @@ public class PCJOptimizerTest {
+ " ?a a ?b . "//
+ "}";//
- String query3 = ""//
+ final String query3 = ""//
+ "SELECT ?h ?i" //
+ "{" //
+ " ?h <uri:p2> ?i . "//
+ " ?h <uri:p3> <uri:o2> . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- ParsedQuery pq3 = parser.parseQuery(query3, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr te3 = pq3.getTupleExpr();
-
- TupleExpr unOpt = te1.clone();
- List<QueryModelNode> remainingNodes = getNodes(te1);
- Set<QueryModelNode> unMatchedNodes = new HashSet<>();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final ParsedQuery pq3 = parser.parseQuery(query3, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
+ final TupleExpr te3 = pq3.getTupleExpr();
+
+ final TupleExpr unOpt = te1.clone();
+ final List<QueryModelNode> remainingNodes = getNodes(te1);
+ final Set<QueryModelNode> unMatchedNodes = new HashSet<>();
unMatchedNodes.add(remainingNodes.get(0));
- SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
- SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
+ final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj1);
externalList.add(pcj2);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes));
@@ -142,7 +167,7 @@ public class PCJOptimizerTest {
public void testExactMatchLargeReOrdered() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" //
+ "{" //
+ " ?a <uri:p0> ?b ." //
@@ -155,7 +180,7 @@ public class PCJOptimizerTest {
+ " OPTIONAL{?b <uri:p4> ?o. ?o <uri:p1> ?p} . "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" //
+ "{" //
+ " ?a <uri:p0> ?b ." //
@@ -168,20 +193,21 @@ public class PCJOptimizerTest {
+ " OPTIONAL{?b <uri:p3> ?e. ?e <uri:p1> ?f} . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr unOpt = te1.clone();
+ final TupleExpr unOpt = te1.clone();
- SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, new HashSet<QueryModelNode>()));
@@ -190,7 +216,7 @@ public class PCJOptimizerTest {
@Test
public void testSubsetMatchLargeReOrdered() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" //
+ "{" //
+ " ?a <uri:p0> ?b ." //
@@ -203,7 +229,7 @@ public class PCJOptimizerTest {
+ " OPTIONAL{?b <uri:p4> ?o. ?o <uri:p1> ?p} . "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" //
+ "{" //
+ " ?a <uri:p0> ?b ." //
@@ -213,15 +239,15 @@ public class PCJOptimizerTest {
+ " OPTIONAL{?b <uri:p3> ?e. ?e <uri:p1> ?f} . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr unOpt = te1.clone();
- List<QueryModelNode> remainingNodes = getNodes(te1);
- Set<QueryModelNode> unMatchedNodes = new HashSet<>();
+ final TupleExpr unOpt = te1.clone();
+ final List<QueryModelNode> remainingNodes = getNodes(te1);
+ final Set<QueryModelNode> unMatchedNodes = new HashSet<>();
unMatchedNodes.add(remainingNodes.get(8));
unMatchedNodes.add(remainingNodes.get(9));
unMatchedNodes.add(remainingNodes.get(10));
@@ -229,11 +255,12 @@ public class PCJOptimizerTest {
unMatchedNodes.add(remainingNodes.get(12));
unMatchedNodes.add(remainingNodes.get(7));
- SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes));
@@ -242,7 +269,7 @@ public class PCJOptimizerTest {
@Test
public void testSwitchTwoBoundVars() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?a ?b ?c " //
+ "{" //
+ " ?a <uri:p0> ?c ." //
@@ -254,7 +281,7 @@ public class PCJOptimizerTest {
+ " ?b <uri:p3> <uri:o3> " //
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?c " //
+ "{" //
+ " ?a <uri:p2> <uri:o2>. " //
@@ -264,23 +291,24 @@ public class PCJOptimizerTest {
+ " ?b<uri:p1> ?c " //
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr unOpt = te1.clone();
- List<QueryModelNode> remainingNodes = getNodes(te1);
- Set<QueryModelNode> unMatchedNodes = new HashSet<>();
+ final TupleExpr unOpt = te1.clone();
+ final List<QueryModelNode> remainingNodes = getNodes(te1);
+ final Set<QueryModelNode> unMatchedNodes = new HashSet<>();
unMatchedNodes.add(remainingNodes.get(1));
unMatchedNodes.add(remainingNodes.get(2));
- SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes));
@@ -289,7 +317,7 @@ public class PCJOptimizerTest {
@Test
public void testSegmentWithLargeUnion() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " {?e <uri:p1> <uri:o1>. } UNION { " //
@@ -304,7 +332,7 @@ public class PCJOptimizerTest {
+ " ?e <uri:p3> <uri:o2> . "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?c " //
+ "{" //
+ " ?a <uri:p2> <uri:o2>. " //
@@ -314,35 +342,36 @@ public class PCJOptimizerTest {
+ " ?b<uri:p1> ?c " //
+ "}";//
- String query3 = ""//
+ final String query3 = ""//
+ "SELECT ?h ?i" //
+ "{" //
+ " ?h <uri:p2> ?i . "//
+ " ?h <uri:p3> <uri:o2> . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- ParsedQuery pq3 = parser.parseQuery(query3, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr te3 = pq3.getTupleExpr();
-
- TupleExpr unOpt = te1.clone();
- List<QueryModelNode> remainingNodes = getNodes(te1);
- Set<QueryModelNode> unMatchedNodes = new HashSet<>();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final ParsedQuery pq3 = parser.parseQuery(query3, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
+ final TupleExpr te3 = pq3.getTupleExpr();
+
+ final TupleExpr unOpt = te1.clone();
+ final List<QueryModelNode> remainingNodes = getNodes(te1);
+ final Set<QueryModelNode> unMatchedNodes = new HashSet<>();
unMatchedNodes.add(remainingNodes.get(0));
unMatchedNodes.add(remainingNodes.get(2));
unMatchedNodes.add(remainingNodes.get(3));
- SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
- SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
+ final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj1);
externalList.add(pcj2);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes));
@@ -352,7 +381,7 @@ public class PCJOptimizerTest {
@Test
public void testSegmentWithUnionAndFilters() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " Filter(?e = <uri:s1>) " //
@@ -362,7 +391,7 @@ public class PCJOptimizerTest {
+ " ?e <uri:p3> <uri:o2> . "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?m" //
+ "{" //
+ " Filter(?b = <uri:s2>) " //
@@ -372,7 +401,7 @@ public class PCJOptimizerTest {
+ " ?a a ?b . "//
+ "}";//
- String query3 = ""//
+ final String query3 = ""//
+ "SELECT ?h ?i" //
+ "{" //
+ " Filter(?h = <uri:s1>) " //
@@ -380,26 +409,27 @@ public class PCJOptimizerTest {
+ " ?h <uri:p3> <uri:o2> . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- ParsedQuery pq3 = parser.parseQuery(query3, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr te3 = pq3.getTupleExpr();
-
- TupleExpr unOpt = te1.clone();
- List<QueryModelNode> remainingNodes = getNodes(te1);
- Set<QueryModelNode> unMatchedNodes = new HashSet<>();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final ParsedQuery pq3 = parser.parseQuery(query3, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
+ final TupleExpr te3 = pq3.getTupleExpr();
+
+ final TupleExpr unOpt = te1.clone();
+ final List<QueryModelNode> remainingNodes = getNodes(te1);
+ final Set<QueryModelNode> unMatchedNodes = new HashSet<>();
unMatchedNodes.add(remainingNodes.get(0));
- SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
- SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
+ final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj1);
externalList.add(pcj2);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes));
@@ -409,7 +439,7 @@ public class PCJOptimizerTest {
@Test
public void testSegmentWithLeftJoinsAndFilters() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " Filter(?e = <uri:s1>) " //
@@ -419,14 +449,14 @@ public class PCJOptimizerTest {
+ " OPTIONAL {?e <uri:p2> ?c } . "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " Filter(?c = <uri:s2>) " //
+ " ?e <uri:p1> <uri:o1>. " + " OPTIONAL {?e <uri:p2> ?l}. " + " ?c <uri:p3> <uri:o3> . "//
+ "}";//
- String query3 = ""//
+ final String query3 = ""//
+ "SELECT ?e ?c" //
+ "{" //
+ " Filter(?e = <uri:s1>) " //
@@ -434,23 +464,24 @@ public class PCJOptimizerTest {
+ " OPTIONAL {?e <uri:p2> ?c } . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- ParsedQuery pq3 = parser.parseQuery(query3, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr te3 = pq3.getTupleExpr();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final ParsedQuery pq3 = parser.parseQuery(query3, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
+ final TupleExpr te3 = pq3.getTupleExpr();
- TupleExpr unOpt = te1.clone();
+ final TupleExpr unOpt = te1.clone();
- SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
- SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2);
+ final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj1);
externalList.add(pcj2);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, new HashSet<QueryModelNode>()));
@@ -459,7 +490,7 @@ public class PCJOptimizerTest {
@Test
public void testJoinMatcherRejectsLeftJoinPcj() throws Exception {
- String query1 = ""//
+ final String query1 = ""//
+ "SELECT ?e ?c ?l" //
+ "{" //
+ " ?e a ?c . "//
@@ -467,7 +498,7 @@ public class PCJOptimizerTest {
+ " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
+ "}";//
- String query2 = ""//
+ final String query2 = ""//
+ "SELECT ?a ?b ?m" //
+ "{" //
+ " ?a a ?b . "//
@@ -475,43 +506,44 @@ public class PCJOptimizerTest {
+ " OPTIONAL {?a <http://www.w3.org/2000/01/rdf-schema#label> ?m} . "//
+ "}";//
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = parser.parseQuery(query1, null);
- ParsedQuery pq2 = parser.parseQuery(query2, null);
- TupleExpr te1 = pq1.getTupleExpr();
- TupleExpr te2 = pq2.getTupleExpr();
- TupleExpr expected = te1.clone();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery pq1 = parser.parseQuery(query1, null);
+ final ParsedQuery pq2 = parser.parseQuery(query2, null);
+ final TupleExpr te1 = pq1.getTupleExpr();
+ final TupleExpr te2 = pq2.getTupleExpr();
+ final TupleExpr expected = te1.clone();
- SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
- List<ExternalTupleSet> externalList = new ArrayList<>();
+ final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2);
+ final List<ExternalTupleSet> externalList = new ArrayList<>();
externalList.add(pcj);
- PCJOptimizer optimizer = new PCJOptimizer(externalList, false);
+ provider.setIndices(externalList);
+ final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider);
optimizer.optimize(te1, null, null);
Assert.assertEquals(expected, te1);
}
- private List<QueryModelNode> getNodes(TupleExpr te) {
- NodeCollector collector = new NodeCollector();
+ private List<QueryModelNode> getNodes(final TupleExpr te) {
+ final NodeCollector collector = new NodeCollector();
te.visit(collector);
return collector.getNodes();
}
- private boolean validatePcj(TupleExpr optTupleExp, TupleExpr unOptTup, List<ExternalTupleSet> pcjs, Set<QueryModelNode> expUnmatchedNodes) {
+ private boolean validatePcj(final TupleExpr optTupleExp, final TupleExpr unOptTup, final List<ExternalTupleSet> pcjs, final Set<QueryModelNode> expUnmatchedNodes) {
- IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(
+ final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(
unOptTup, pcjs);
- List<ExternalTupleSet> indexList = iep.getNormalizedIndices();
- Set<QueryModelNode> indexSet = new HashSet<>();
- for(ExternalTupleSet etup: indexList) {
+ final List<ExternalTupleSet> indexList = iep.getNormalizedIndices();
+ final Set<QueryModelNode> indexSet = new HashSet<>();
+ for(final ExternalTupleSet etup: indexList) {
indexSet.add(etup);
}
- Set<QueryModelNode> tupNodes = Sets.newHashSet(getNodes(optTupleExp));
+ final Set<QueryModelNode> tupNodes = Sets.newHashSet(getNodes(optTupleExp));
- Set<QueryModelNode> diff = Sets.difference(tupNodes, indexSet);
+ final Set<QueryModelNode> diff = Sets.difference(tupNodes, indexSet);
return diff.equals(expUnmatchedNodes);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
index e19e965..e75d499 100644
--- a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java
@@ -113,8 +113,8 @@ public class KafkaLatencyBenchmark implements AutoCloseable {
public KafkaLatencyBenchmark(final CommonOptions options, final BenchmarkOptions benchmarkOptions) throws AccumuloException, AccumuloSecurityException {
this.options = Objects.requireNonNull(options);
this.benchmarkOptions = Objects.requireNonNull(benchmarkOptions);
- this.client = Objects.requireNonNull(options.buildRyaClient());
- this.startTime = LocalDateTime.now();
+ client = Objects.requireNonNull(options.buildRyaClient());
+ startTime = LocalDateTime.now();
logger.info("Running {} with the following input parameters:\n{}\n{}", this.getClass(), options, benchmarkOptions);
}
@@ -172,7 +172,7 @@ public class KafkaLatencyBenchmark implements AutoCloseable {
+ "group by ?type";
logger.info("Query: {}", sparql);
- return client.getCreatePCJ().get().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA));
+ return client.getCreatePCJ().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA));
}
private String issuePeriodicQuery(final PeriodicQueryCommand periodicOptions) throws InstanceDoesNotExistException, RyaClientException {
@@ -352,7 +352,7 @@ public class KafkaLatencyBenchmark implements AutoCloseable {
logger.info("Publishing {} Observations", observationsPerIteration);
final long t1 = System.currentTimeMillis();
loadStatements.loadStatements(ryaInstanceName, statements);
- logger.info("Published {} observations in in {}s", observationsPerIteration, ((System.currentTimeMillis() - t1)/1000.0));
+ logger.info("Published {} observations in in {}s", observationsPerIteration, (System.currentTimeMillis() - t1)/1000.0);
logger.info("Updating published totals...");
for(int typeId = 0; typeId < numTypes; typeId++) {
typeToStatMap.get(typePrefix + typeId).total.addAndGet(observationsPerTypePerIteration);
@@ -367,7 +367,7 @@ public class KafkaLatencyBenchmark implements AutoCloseable {
}
public void setShutdownOperation(final Runnable f) {
- this.shutdownOperation = f;
+ shutdownOperation = f;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
index 6841048..cf4ca8f 100644
--- a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
@@ -28,9 +28,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Queue;
+import org.apache.hadoop.conf.Configuration;
import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.apache.rya.indexing.external.tupleSet.SimpleExternalTupleSet;
import org.apache.rya.indexing.pcj.matching.PCJOptimizer;
+import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
@@ -106,12 +108,12 @@ public class PCJOptimizerBenchmark {
final BenchmarkValues chainedValues = new BenchmarkValues(
makeChainedQuery(benchmarkParams),
makeChainedPCJOptimizer(benchmarkParams));
- this.chainedBenchmarkValues.put(benchmarkParams, chainedValues);
+ chainedBenchmarkValues.put(benchmarkParams, chainedValues);
final BenchmarkValues unchainedValues = new BenchmarkValues(
makeUnchainedQuery(benchmarkParams),
makeUnchainedPCJOptimizer(benchmarkParams));
- this.unchainedBenchmarkValues.put(benchmarkParams, unchainedValues);
+ unchainedBenchmarkValues.put(benchmarkParams, unchainedValues);
}
}
}
@@ -215,7 +217,7 @@ public class PCJOptimizerBenchmark {
}
// Create the optimizer.
- return new PCJOptimizer(indices, false);
+ return new PCJOptimizer(indices, false, new AccumuloIndexSetProvider(new Configuration()));
}
private static PCJOptimizer makeChainedPCJOptimizer(final BenchmarkParams params) throws Exception {
@@ -252,7 +254,7 @@ public class PCJOptimizerBenchmark {
}
// Create the optimizer.
- return new PCJOptimizer(indices, false);
+ return new PCJOptimizer(indices, false, new AccumuloIndexSetProvider(new Configuration()));
}
private static String buildUnchainedSPARQL(final List<String> vars) {
@@ -274,8 +276,8 @@ public class PCJOptimizerBenchmark {
}
return "select " + Joiner.on(" ").join(vars) + " where { " +
- Joiner.on(" . ").join(statementPatterns) +
- " . }" ;
+ Joiner.on(" . ").join(statementPatterns) +
+ " . }" ;
}
private static String buildChainedSPARQL(final List<String> vars) {
@@ -298,8 +300,8 @@ public class PCJOptimizerBenchmark {
// Build the SPARQL query from the pieces.
return "select " + Joiner.on(" ").join(vars) + " where { " +
- Joiner.on(" . ").join(statementPatterns) +
- " . }" ;
+ Joiner.on(" . ").join(statementPatterns) +
+ " . }" ;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
index cc5ba8b..dd5fe68 100644
--- a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
@@ -148,7 +148,7 @@ public class QueryBenchmarkRunIT {
final String pcjId = pcjs.createPcj(SPARQL_QUERY);
// Batch update the PCJ using the Rya Client.
- ryaClient.getBatchUpdatePCJ().get().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+ ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/pom.xml b/extras/rya.indexing.pcj/pom.xml
index 797c2fb..1d4a4b6 100644
--- a/extras/rya.indexing.pcj/pom.xml
+++ b/extras/rya.indexing.pcj/pom.xml
@@ -48,6 +48,10 @@ under the License.
<groupId>org.apache.rya</groupId>
<artifactId>accumulo.rya</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>mongodb.rya</artifactId>
+ </dependency>
<!-- Accumulo support dependencies. -->
<dependency>
@@ -90,5 +94,11 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>mongodb.rya</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java
new file mode 100644
index 0000000..010f8bc
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter;
+import org.bson.Document;
+import org.openrdf.query.BindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Converts {@link BindingSet}s into other representations. This library is
+ * intended to convert between BindingSet and {@link Document}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface MongoBindingSetConverter extends BindingSetConverter<Document> {
+
+ /**
+ * Converts a {@link BindingSet} into a MongoDB model.
+ *
+ * @param bindingSet - The BindingSet that will be converted. (not null)
+ * @return The BindingSet formatted as Mongo Bson object.
+ * @throws BindingSetConversionException The BindingSet was unable to be
+ * converted. This will happen if one of the values could not be
+ * converted into the target model.
+ */
+ public Document convert(BindingSet bindingSet) throws BindingSetConversionException;
+
+ /**
+ * Converts a MongoDB model into a {@link BindingSet}.
+ *
+ * @param bindingSet - The bson that will be converted. (not null)
+ * @return The BindingSet created from a Mongo Bson object.
+ * @throws BindingSetConversionException The Bson was unable to be
+ * converted. This will happen if one of the values could not be
+ * converted into a BindingSet.
+ */
+ public BindingSet convert(Document bindingSet) throws BindingSetConversionException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java
new file mode 100644
index 0000000..ecfbc1c
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.util.JSON;
+
+/**
+ * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows:
+ *
+ * <pre>
+ * <code>
+ * ----- PCJ Metadata Doc -----
+ * {
+ * _id: [pcj_ID]_METADATA,
+ * sparql: [sparql query to match results],
+ * varOrders: [varOrder1, VarOrder2, ..., VarOrdern]
+ * cardinality: [number of results]
+ * }
+ *
+ * ----- PCJ Results Doc -----
+ * {
+ * pcjId: [pcj_ID],
+ * visibilities: [visibilities]
+ * [binding_var1]: {
+ * uri: [type_uri],
+ * value: [value]
+ * }
+ * .
+ * .
+ * .
+ * [binding_varn]: {
+ * uri: [type_uri],
+ * value: [value]
+ * }
+ * }
+ * </code>
+ * </pre>
+ */
+public class MongoPcjDocuments {
+ public static final String PCJ_COLLECTION_NAME = "pcjs";
+
+ // metadata fields
+ public static final String CARDINALITY_FIELD = "cardinality";
+ public static final String SPARQL_FIELD = "sparql";
+ public static final String PCJ_METADATA_ID = "_id";
+ public static final String VAR_ORDER_FIELD = "varOrders";
+
+ // pcj results fields
+ private static final String BINDING_VALUE = "value";
+ private static final String BINDING_TYPE = "rdfType";
+ private static final String VISIBILITIES_FIELD = "visibilities";
+ private static final String PCJ_ID = "pcjId";
+
+ private final MongoCollection<Document> pcjCollection;
+ private static final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory();
+
+ /**
+ * Creates a new {@link MongoPcjDocuments}.
+ * @param client - The {@link MongoClient} to use to connect to mongo.
+ * @param ryaInstanceName - The rya instance to connect to.
+ */
+ public MongoPcjDocuments(final MongoClient client, final String ryaInstanceName) {
+ requireNonNull(client);
+ requireNonNull(ryaInstanceName);
+ pcjCollection = client.getDatabase(ryaInstanceName).getCollection(PCJ_COLLECTION_NAME);
+ }
+
+ private String makeMetadataID(final String pcjId) {
+ return pcjId + "_METADATA";
+ }
+
+ /**
+ * Creates a {@link Document} containing the metadata defining the PCj.
+ *
+ * @param pcjId - Uniquely identifies a PCJ within Rya. (not null)
+ * @param sparql - The sparql query the PCJ will use.
+ * @return The document built around the provided metadata.
+ * @throws PCJStorageException - Thrown when the sparql query is malformed.
+ */
+ public Document makeMetadataDocument(final String pcjId, final String sparql) throws PCJStorageException {
+ requireNonNull(pcjId);
+ requireNonNull(sparql);
+
+ final Set<VariableOrder> varOrders;
+ try {
+ varOrders = pcjVarOrderFactory.makeVarOrders(sparql);
+ } catch (final MalformedQueryException e) {
+ throw new PCJStorageException("Can not create the PCJ. The SPARQL is malformed.", e);
+ }
+
+ return new Document()
+ .append(PCJ_METADATA_ID, makeMetadataID(pcjId))
+ .append(SPARQL_FIELD, sparql)
+ .append(CARDINALITY_FIELD, 0)
+ .append(VAR_ORDER_FIELD, varOrders);
+
+ }
+
+ /**
+ * Creates a new PCJ based on the provided metadata. The initial pcj results
+ * will be empty.
+ *
+ * @param pcjId - Uniquely identifies a PCJ within Rya.
+ * @param sparql - The query the pcj is assigned to.
+ * @throws PCJStorageException - Thrown when the sparql query is malformed.
+ */
+ public void createPcj(final String pcjId, final String sparql) throws PCJStorageException {
+ pcjCollection.insertOne(makeMetadataDocument(pcjId, sparql));
+ }
+
+ /**
+ * Creates a new PCJ document and populates it by scanning an instance of
+ * Rya for historic matches.
+ * <p>
+ * If any portion of this operation fails along the way, the partially
+ * create PCJ documents will be left in Mongo.
+ *
+ * @param ryaConn - Connects to the Rya that will be scanned. (not null)
+ * @param pcjId - Uniquely identifies a PCJ within Rya. (not null)
+ * @param sparql - The SPARQL query whose results will be loaded into the PCJ results document. (not null)
+ * @throws PCJStorageException The PCJ documents could not be create or the
+ * values from Rya were not able to be loaded into it.
+ */
+ public void createAndPopulatePcj(
+ final RepositoryConnection ryaConn,
+ final String pcjId,
+ final String sparql) throws PCJStorageException {
+ checkNotNull(ryaConn);
+ checkNotNull(pcjId);
+ checkNotNull(sparql);
+
+ // Create the PCJ document in Mongo.
+ createPcj(pcjId, sparql);
+
+ // Load historic matches from Rya into the PCJ results document.
+ populatePcj(pcjId, ryaConn);
+ }
+
+ /**
+ * Gets the {@link PcjMetadata} from a provided PCJ Id.
+ *
+ * @param pcjId - The Id of the PCJ to get from MongoDB. (not null)
+ * @return - The {@link PcjMetadata} of the Pcj specified.
+ * @throws PCJStorageException The PCJ metadata document does not exist.
+ */
+ public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException {
+ requireNonNull(pcjId);
+
+ // since query by ID, there will only be one.
+ final Document result = pcjCollection.find(new Document(PCJ_METADATA_ID, makeMetadataID(pcjId))).first();
+
+ if(result == null) {
+ throw new PCJStorageException("The PCJ: " + pcjId + " does not exist.");
+ }
+
+ final String sparql = result.getString(SPARQL_FIELD);
+ final int cardinality = result.getInteger(CARDINALITY_FIELD, 0);
+ final List<List<String>> varOrders= (List<List<String>>) result.get(VAR_ORDER_FIELD);
+ final Set<VariableOrder> varOrder = new HashSet<>();
+ for(final List<String> vars : varOrders) {
+ varOrder.add(new VariableOrder(vars));
+ }
+
+ return new PcjMetadata(sparql, cardinality, varOrder);
+ }
+
+ /**
+ * Adds binding set results to a specific PCJ.
+ *
+ * @param pcjId - Uniquely identifies a PCJ within Rya. (not null)
+ * @param results - The binding set results. (not null)
+ */
+ public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) {
+ checkNotNull(pcjId);
+ checkNotNull(results);
+
+ final List<Document> pcjDocs = new ArrayList<>();
+ for (final VisibilityBindingSet vbs : results) {
+ // each binding gets it's own doc.
+ final Document bindingDoc = new Document(PCJ_ID, pcjId);
+ vbs.forEach(binding -> {
+ final RyaType type = RdfToRyaConversions.convertValue(binding.getValue());
+ bindingDoc.append(binding.getName(),
+ new Document()
+ .append(BINDING_TYPE, type.getDataType().stringValue())
+ .append(BINDING_VALUE, type.getData())
+ );
+ });
+ bindingDoc.append(VISIBILITIES_FIELD, vbs.getVisibility());
+ pcjDocs.add(bindingDoc);
+ }
+ pcjCollection.insertMany(pcjDocs);
+
+ // update cardinality in the metadata doc.
+ final int appendCardinality = pcjDocs.size();
+ final Bson query = new Document(PCJ_METADATA_ID, makeMetadataID(pcjId));
+ final Bson update = new Document("$inc", new Document(CARDINALITY_FIELD, appendCardinality));
+ pcjCollection.updateOne(query, update);
+ }
+
+ /**
+ * Purges all results from the PCJ results document with the provided Id.
+ *
+ * @param pcjId - The Id of the PCJ to purge. (not null)
+ */
+ public void purgePcjs(final String pcjId) {
+ requireNonNull(pcjId);
+
+ // remove every doc for the pcj, except the metadata
+ final Bson filter = new Document(PCJ_ID, pcjId);
+ pcjCollection.deleteMany(filter);
+
+ // reset cardinality
+ final Bson query = new Document(PCJ_METADATA_ID, makeMetadataID(pcjId));
+ final Bson update = new Document("$set", new Document(CARDINALITY_FIELD, 0));
+ pcjCollection.updateOne(query, update);
+ }
+
+ /**
+ * Scan Rya for results that solve the PCJ's query and store them in the PCJ
+ * document.
+ * <p>
+ * This method assumes the PCJ document has already been created.
+ *
+ * @param pcjId - The Id of the PCJ that will receive the results. (not null)
+ * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null)
+ * @throws PCJStorageException If results could not be written to the PCJ results document,
+ * the PCJ results document does not exist, or the query that is being execute was malformed.
+ */
+ public void populatePcj(final String pcjId, final RepositoryConnection ryaConn) throws PCJStorageException {
+ checkNotNull(pcjId);
+ checkNotNull(ryaConn);
+
+ try {
+ // Fetch the query that needs to be executed from the PCJ metadata document.
+ final PcjMetadata pcjMetadata = getPcjMetadata(pcjId);
+ final String sparql = pcjMetadata.getSparql();
+
+ // Query Rya for results to the SPARQL query.
+ final TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql);
+ final TupleQueryResult results = query.evaluate();
+
+ // Load batches of 1000 of them at a time into the PCJ results document.
+ final Set<VisibilityBindingSet> batch = new HashSet<>(1000);
+ while(results.hasNext()) {
+ final VisibilityBindingSet bs = new VisibilityBindingSet(results.next());
+ batch.add( bs );
+ if(batch.size() == 1000) {
+ addResults(pcjId, batch);
+ batch.clear();
+ }
+ }
+
+ if(!batch.isEmpty()) {
+ addResults(pcjId, batch);
+ }
+
+ } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) {
+ throw new PCJStorageException(
+ "Could not populate a PCJ document with Rya results for the pcj with Id: " + pcjId, e);
+ }
+ }
+
+ /**
+ * List the document Ids of the PCJs that are stored in MongoDB
+ * for this instance of Rya.
+ *
+ * @return A list of pcj document Ids that hold PCJ index data for the current
+ * instance of Rya.
+ */
+ public List<String> listPcjDocuments() {
+ final List<String> pcjIds = new ArrayList<>();
+
+ //This Bson string reads as:
+ //{} - no search criteria: find all
+ //{ _id: 1 } - only return the _id, which is the PCJ Id.
+ final FindIterable<Document> rez = pcjCollection.find((Bson) JSON.parse("{ }, { " + PCJ_METADATA_ID + ": 1 , _id: 0}"));
+ final Iterator<Document> iter = rez.iterator();
+ while(iter.hasNext()) {
+ pcjIds.add(iter.next().get(PCJ_METADATA_ID).toString().replace("_METADATA", ""));
+ }
+
+ return pcjIds;
+ }
+
+ /**
+ * Returns all of the results of a PCJ.
+ *
+ * @param pcjId
+ * - The PCJ to get the results for. (not null)
+ * @return The authorized PCJ results.
+ */
+ public CloseableIterator<BindingSet> listResults(final String pcjId) {
+ requireNonNull(pcjId);
+
+ // get all results based on pcjId
+ return queryForBindings(new Document(PCJ_ID, pcjId));
+ }
+
+ /**
+ * Retrieves the stored {@link BindingSet} results for the provided pcjId.
+ *
+ * @param pcjId - The Id of the PCJ to retrieve results from.
+ * @param restrictionBindings - The collection of {@link BindingSet}s to restrict results.
+ * <p>
+ * Note: the result restrictions from {@link BindingSet}s are an OR
+ * over ANDS in that: <code>
+ * [
+ * bindingset: binding AND binding AND binding,
+ * OR
+ * bindingset: binding AND binding AND binding,
+ * .
+ * .
+ * .
+ * OR
+ * bindingset: binding
+ * ]
+ * </code>
+ * @return
+ */
+ public CloseableIterator<BindingSet> getResults(final String pcjId, final Collection<BindingSet> restrictionBindings) {
+ // empty bindings return all results.
+ if (restrictionBindings.size() == 1 && restrictionBindings.iterator().next().size() == 0) {
+ return listResults(pcjId);
+ }
+
+ final Document query = new Document(PCJ_ID, pcjId);
+ final Document bindingSetDoc = new Document();
+ final List<Document> bindingSetList = new ArrayList<>();
+ restrictionBindings.forEach(bindingSet -> {
+ final Document bindingDoc = new Document();
+ final List<Document> bindings = new ArrayList<>();
+ bindingSet.forEach(binding -> {
+ final RyaType type = RdfToRyaConversions.convertValue(binding.getValue());
+ final Document typeDoc = new Document()
+ .append(BINDING_TYPE, type.getDataType().stringValue())
+ .append(BINDING_VALUE, type.getData());
+ final Document bind = new Document(binding.getName(), typeDoc);
+ bindings.add(bind);
+ });
+ bindingDoc.append("$and", bindings);
+ bindingSetList.add(bindingDoc);
+ });
+ bindingSetDoc.append("$or", bindingSetList);
+ return queryForBindings(query);
+ }
+
+ private CloseableIterator<BindingSet> queryForBindings(final Document query) {
+ final FindIterable<Document> rez = pcjCollection.find(query);
+ final Iterator<Document> resultsIter = rez.iterator();
+ return new CloseableIterator<BindingSet>() {
+ @Override
+ public boolean hasNext() {
+ return resultsIter.hasNext();
+ }
+
+ @Override
+ public BindingSet next() {
+ final Document bs = resultsIter.next();
+ final MapBindingSet binding = new MapBindingSet();
+ for (final String key : bs.keySet()) {
+ if (key.equals(VISIBILITIES_FIELD)) {
+ // has auths, is a visibility binding set.
+ } else if (!key.equals("_id") && !key.equals(PCJ_ID)) {
+ // is the binding value.
+ final Document typeDoc = (Document) bs.get(key);
+ final URI dataType = new URIImpl(typeDoc.getString(BINDING_TYPE));
+ final RyaType type = new RyaType(dataType, typeDoc.getString(BINDING_VALUE));
+ final Value value = RyaToRdfConversions.convertValue(type);
+ binding.addBinding(key, value);
+ }
+ }
+ return binding;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+ };
+ }
+
+ /**
+ * Drops a pcj based on the PCJ Id. Removing the entire document from Mongo.
+ *
+ * @param pcjId - The identifier for the PCJ to remove.
+ */
+ public void dropPcj(final String pcjId) {
+ purgePcjs(pcjId);
+ pcjCollection.deleteOne(new Document(PCJ_METADATA_ID, makeMetadataID(pcjId)));
+ }
+}
\ No newline at end of file