You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 16:44:52 UTC
[3/5] incubator-rya git commit: RYA-332 rename integration tests.
Closes #212.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java
deleted file mode 100644
index 1896025..0000000
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java
+++ /dev/null
@@ -1,505 +0,0 @@
-package org.apache.rya.indexing.external;
-
-import java.net.UnknownHostException;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.vocabulary.RDF;
-import org.openrdf.model.vocabulary.RDFS;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.QueryLanguage;
-import org.openrdf.query.QueryResultHandlerException;
-import org.openrdf.query.TupleQueryResultHandler;
-import org.openrdf.query.TupleQueryResultHandlerException;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Optional;
-
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
-
-public class PrecompJoinOptimizerIntegrationTest {
-
- private SailRepositoryConnection conn, pcjConn;
- private SailRepository repo, pcjRepo;
- private Connector accCon;
- String tablePrefix = "table_";
- URI sub, sub2, obj, obj2, subclass, subclass2, talksTo;
-
- @Before
- public void init() throws RepositoryException,
- TupleQueryResultHandlerException, QueryEvaluationException,
- MalformedQueryException, AccumuloException,
- AccumuloSecurityException, TableExistsException, RyaDAOException,
- TableNotFoundException, InferenceEngineException, NumberFormatException,
- UnknownHostException, SailException {
-
- repo = PcjIntegrationTestingUtil.getNonPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
-
- pcjRepo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- pcjConn = pcjRepo.getConnection();
-
- sub = new URIImpl("uri:entity");
- subclass = new URIImpl("uri:class");
- obj = new URIImpl("uri:obj");
- talksTo = new URIImpl("uri:talksTo");
-
- conn.add(sub, RDF.TYPE, subclass);
- conn.add(sub, RDFS.LABEL, new LiteralImpl("label"));
- conn.add(sub, talksTo, obj);
-
- sub2 = new URIImpl("uri:entity2");
- subclass2 = new URIImpl("uri:class2");
- obj2 = new URIImpl("uri:obj2");
-
- conn.add(sub2, RDF.TYPE, subclass2);
- conn.add(sub2, RDFS.LABEL, new LiteralImpl("label2"));
- conn.add(sub2, talksTo, obj2);
-
- accCon = new MockInstance("instance").getConnector("root",
- new PasswordToken(""));
-
- }
-
- @After
- public void close() throws RepositoryException, AccumuloException,
- AccumuloSecurityException, TableNotFoundException {
-
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- PcjIntegrationTestingUtil.closeAndShutdown(pcjConn, pcjRepo);
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.deleteIndexTables(accCon, 2, tablePrefix);
-
- }
-
- @Test
- public void testEvaluateSingeIndex()
- throws TupleQueryResultHandlerException, QueryEvaluationException,
- MalformedQueryException, RepositoryException, AccumuloException,
- AccumuloSecurityException, TableExistsException, RyaDAOException,
- SailException, TableNotFoundException, PcjException, InferenceEngineException,
- NumberFormatException, UnknownHostException {
-
- final String indexSparqlString = ""//
- + "SELECT ?e ?l ?c " //
- + "{" //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_1", indexSparqlString, new String[] { "e", "l", "c" },
- Optional.<PcjVarOrderFactory> absent());
- final String queryString = ""//
- + "SELECT ?e ?c ?l ?o " //
- + "{" //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
- + " ?e <uri:talksTo> ?o . "//
- + "}";//
-
- final CountingResultHandler crh = new CountingResultHandler();
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
- conn.add(sub, talksTo, obj);
- conn.add(sub2, talksTo, obj2);
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(crh);
-
- Assert.assertEquals(2, crh.getCount());
-
- }
-
- @Test
- public void testEvaluateTwoIndexTwoVarOrder1() throws AccumuloException,
- AccumuloSecurityException, TableExistsException,
- RepositoryException, MalformedQueryException, SailException,
- QueryEvaluationException, TableNotFoundException,
- TupleQueryResultHandlerException, RyaDAOException, PcjException {
-
- conn.add(obj, RDFS.LABEL, new LiteralImpl("label"));
- conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2"));
-
- final String indexSparqlString = ""//
- + "SELECT ?e ?l ?c " //
- + "{" //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final String indexSparqlString2 = ""//
- + "SELECT ?e ?o ?l " //
- + "{" //
- + " ?e <uri:talksTo> ?o . "//
- + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final String queryString = ""//
- + "SELECT ?e ?c ?l ?o " //
- + "{" //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
- + " ?e <uri:talksTo> ?o . "//
- + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_1", indexSparqlString, new String[] { "e", "l", "c" },
- Optional.<PcjVarOrderFactory> absent());
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "o" },
- Optional.<PcjVarOrderFactory> absent());
- final CountingResultHandler crh = new CountingResultHandler();
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(
- crh);
-
- Assert.assertEquals(2, crh.getCount());
-
- }
-
- @Test
- public void testEvaluateSingeFilterIndex()
- throws TupleQueryResultHandlerException, QueryEvaluationException,
- MalformedQueryException, RepositoryException, AccumuloException,
- AccumuloSecurityException, TableExistsException, RyaDAOException,
- SailException, TableNotFoundException, PcjException, InferenceEngineException,
- NumberFormatException, UnknownHostException {
-
- final String indexSparqlString = ""//
- + "SELECT ?e ?l ?c " //
- + "{" //
- + " Filter(?e = <uri:entity>) " //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_1", indexSparqlString, new String[] { "e", "l", "c" },
- Optional.<PcjVarOrderFactory> absent());
- final String queryString = ""//
- + "SELECT ?e ?c ?l ?o " //
- + "{" //
- + " Filter(?e = <uri:entity>) " //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
- + " ?e <uri:talksTo> ?o . "//
- + "}";//
-
- final CountingResultHandler crh = new CountingResultHandler();
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
- conn.add(sub, talksTo, obj);
- conn.add(sub2, talksTo, obj2);
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(
- crh);
-
- Assert.assertEquals(1, crh.getCount());
-
- }
-
- @Test
- public void testEvaluateSingeFilterWithUnion()
- throws TupleQueryResultHandlerException, QueryEvaluationException,
- MalformedQueryException, RepositoryException, AccumuloException,
- AccumuloSecurityException, TableExistsException, RyaDAOException,
- SailException, TableNotFoundException, PcjException, InferenceEngineException,
- NumberFormatException, UnknownHostException {
-
- final String indexSparqlString2 = ""//
- + "SELECT ?e ?l ?c " //
- + "{" //
- + " Filter(?l = \"label2\") " //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "c" },
- Optional.<PcjVarOrderFactory> absent());
-
- final String queryString = ""//
- + "SELECT ?e ?c ?o ?m ?l" //
- + "{" //
- + " Filter(?l = \"label2\") " //
- + " ?e <uri:talksTo> ?o . "//
- + " { ?e a ?c . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?m }"//
- + " UNION { ?e a ?c . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l }"//
- + "}";//
-
- final CountingResultHandler crh = new CountingResultHandler();
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
- conn.add(sub, talksTo, obj);
- conn.add(sub2, talksTo, obj2);
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(
- crh);
-
- Assert.assertEquals(1, crh.getCount());
-
- }
-
- @Test
- public void testEvaluateSingeFilterWithLeftJoin()
- throws TupleQueryResultHandlerException, QueryEvaluationException,
- MalformedQueryException, RepositoryException, AccumuloException,
- AccumuloSecurityException, TableExistsException, RyaDAOException,
- SailException, TableNotFoundException, PcjException, InferenceEngineException,
- NumberFormatException, UnknownHostException {
-
- final String indexSparqlString1 = ""//
- + "SELECT ?e ?l ?c " //
- + "{" //
- + " Filter(?l = \"label3\") " //
- + " ?e a ?c . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final URI sub3 = new URIImpl("uri:entity3");
- final URI subclass3 = new URIImpl("uri:class3");
- conn.add(sub3, RDF.TYPE, subclass3);
- conn.add(sub3, RDFS.LABEL, new LiteralImpl("label3"));
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_1", indexSparqlString1, new String[] { "e", "l", "c" },
- Optional.<PcjVarOrderFactory> absent());
- final String queryString = ""//
- + "SELECT ?e ?c ?o ?m ?l" //
- + "{" //
- + " Filter(?l = \"label3\") " //
- + " ?e a ?c . " //
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . " //
- + " OPTIONAL { ?e <uri:talksTo> ?o . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?m }"//
- + "}";//
-
- final CountingResultHandler crh = new CountingResultHandler();
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
- conn.add(sub, talksTo, obj);
- conn.add(sub, RDFS.LABEL, new LiteralImpl("label"));
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(
- crh);
-
- Assert.assertEquals(1, crh.getCount());
-
- }
-
- @Test
- public void testEvaluateTwoIndexUnionFilter() throws AccumuloException,
- AccumuloSecurityException, TableExistsException,
- RepositoryException, MalformedQueryException, SailException,
- QueryEvaluationException, TableNotFoundException,
- TupleQueryResultHandlerException, RyaDAOException, PcjException, InferenceEngineException,
- NumberFormatException, UnknownHostException {
-
- conn.add(obj, RDFS.LABEL, new LiteralImpl("label"));
- conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2"));
- conn.add(sub, RDF.TYPE, obj);
- conn.add(sub2, RDF.TYPE, obj2);
-
- final String indexSparqlString = ""//
- + "SELECT ?e ?l ?o " //
- + "{" //
- + " Filter(?l = \"label2\") " //
- + " ?e a ?o . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final String indexSparqlString2 = ""//
- + "SELECT ?e ?l ?o " //
- + "{" //
- + " Filter(?l = \"label2\") " //
- + " ?e <uri:talksTo> ?o . "//
- + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final String queryString = ""//
- + "SELECT ?c ?e ?l ?o " //
- + "{" //
- + " Filter(?l = \"label2\") " //
- + " ?e a ?c . "//
- + " { ?e a ?o . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l }"//
- + " UNION { ?e <uri:talksTo> ?o . ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l }"//
- + "}";//
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_1", indexSparqlString, new String[] { "e", "l", "o" },
- Optional.<PcjVarOrderFactory> absent());
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "o" },
- Optional.<PcjVarOrderFactory> absent());
-
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
- conn.add(sub2, RDF.TYPE, subclass2);
- conn.add(sub2, RDF.TYPE, obj2);
- final CountingResultHandler crh = new CountingResultHandler();
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(
- crh);
-
- Assert.assertEquals(6, crh.getCount());
-
- }
-
- @Test
- public void testEvaluateTwoIndexLeftJoinUnionFilter()
- throws AccumuloException, AccumuloSecurityException,
- TableExistsException, RepositoryException, MalformedQueryException,
- SailException, QueryEvaluationException, TableNotFoundException,
- TupleQueryResultHandlerException, RyaDAOException, PcjException, InferenceEngineException,
- NumberFormatException, UnknownHostException {
-
- conn.add(obj, RDFS.LABEL, new LiteralImpl("label"));
- conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2"));
- conn.add(sub, RDF.TYPE, obj);
- conn.add(sub2, RDF.TYPE, obj2);
-
- final URI livesIn = new URIImpl("uri:livesIn");
- final URI city = new URIImpl("uri:city");
- final URI city2 = new URIImpl("uri:city2");
- final URI city3 = new URIImpl("uri:city3");
- conn.add(sub, livesIn, city);
- conn.add(sub2, livesIn, city2);
- conn.add(sub2, livesIn, city3);
- conn.add(sub, livesIn, city3);
-
- final String indexSparqlString = ""//
- + "SELECT ?e ?l ?o " //
- + "{" //
- + " ?e a ?o . "//
- + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final String indexSparqlString2 = ""//
- + "SELECT ?e ?l ?o " //
- + "{" //
- + " ?e <uri:talksTo> ?o . "//
- + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "//
- + "}";//
-
- final String queryString = ""//
- + "SELECT ?c ?e ?l ?o " //
- + "{" //
- + " Filter(?c = <uri:city3>) " //
- + " ?e <uri:livesIn> ?c . "//
- + " OPTIONAL{{ ?e a ?o . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l }"//
- + " UNION { ?e <uri:talksTo> ?o . ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l }}"//
- + "}";//
-
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_1", indexSparqlString, new String[] { "e", "l", "o" },
- Optional.<PcjVarOrderFactory> absent());
- PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix
- + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "o" },
- Optional.<PcjVarOrderFactory> absent());
-
- PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix);
- PcjIntegrationTestingUtil.closeAndShutdown(conn, repo);
- repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance");
- conn = repo.getConnection();
- conn.add(sub2, livesIn, city3);
- conn.add(sub, livesIn, city3);
-
- final CountingResultHandler crh = new CountingResultHandler();
- pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(
- crh);
-
- Assert.assertEquals(6, crh.getCount());
-
- }
-
- public static class CountingResultHandler implements
- TupleQueryResultHandler {
- private int count = 0;
-
- public int getCount() {
- return count;
- }
-
- public void resetCount() {
- count = 0;
- }
-
- @Override
- public void startQueryResult(final List<String> arg0)
- throws TupleQueryResultHandlerException {
- }
-
- @Override
- public void handleSolution(final BindingSet arg0)
- throws TupleQueryResultHandlerException {
- System.out.println(arg0);
- count++;
- System.out.println("Count is " + count);
- }
-
- @Override
- public void endQueryResult() throws TupleQueryResultHandlerException {
- }
-
- @Override
- public void handleBoolean(final boolean arg0)
- throws QueryResultHandlerException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void handleLinks(final List<String> arg0)
- throws QueryResultHandlerException {
- // TODO Auto-generated method stub
-
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
new file mode 100644
index 0000000..7e410e0
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.NumericLiteralImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Performs integration test using {@link MiniAccumuloCluster} to ensure the
+ * functions of {@link PcjTables} work within a cluster setting.
+ */
+public class PcjTablesIT {
+
+ private static final String USE_MOCK_INSTANCE = ".useMockInstance";
+ private static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
+ private static final String CLOUDBASE_USER = "sc.cloudbase.username";
+ private static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
+
+ private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
+ // The MiniAccumuloCluster is re-used between tests.
+ private MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance();
+
+ // Rya data store and connections.
+ protected RyaSailRepository ryaRepo = null;
+ protected RepositoryConnection ryaConn = null;
+
+ @Rule
+ public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
+
+ @BeforeClass
+ public static void killLoudLogs() {
+ Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
+ }
+
+ @Before
+ public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException {
+ // Setup the Rya library to use the Mini Accumulo.
+ ryaRepo = setupRya();
+ ryaConn = ryaRepo.getConnection();
+ }
+
+ @After
+ public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException {
+ // Stop Rya.
+ if (ryaRepo != null) {
+ ryaRepo.shutDown();
+ }
+ }
+
+ private String getRyaInstanceName() {
+ return testInstance.getRyaInstanceName();
+ }
+
+ /**
+ * Format a Mini Accumulo to be a Rya repository.
+ *
+ * @return The Rya repository sitting on top of the Mini Accumulo.
+ */
+ private RyaSailRepository setupRya() throws AccumuloException, AccumuloSecurityException, RepositoryException {
+ // Setup the Rya Repository that will be used to create Repository Connections.
+ final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+ final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
+ crdfdao.setConnector( cluster.getConnector() );
+
+ // Setup Rya configuration values.
+ final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix(getRyaInstanceName());
+ conf.setDisplayQueryPlan(true);
+
+ conf.setBoolean(USE_MOCK_INSTANCE, false);
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, getRyaInstanceName());
+ conf.set(CLOUDBASE_USER, cluster.getUsername());
+ conf.set(CLOUDBASE_PASSWORD, cluster.getPassword());
+ conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName());
+
+ crdfdao.setConf(conf);
+ ryaStore.setRyaDAO(crdfdao);
+
+ final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
+ ryaRepo.initialize();
+
+ return ryaRepo;
+ }
+
+ /**
+ * Ensure that when a new PCJ table is created, it is initialized with the
+ * correct metadata values.
+ * <p>
+ * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)}
+ */
+ @Test
+ public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final Connector accumuloConn = cluster.getConnector();
+
+ // Create a PCJ table in the Mini Accumulo.
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+ // Fetch the PcjMetadata and ensure it has the correct values.
+ final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+
+ // Ensure the metadata matches the expected value.
+ final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
+ assertEquals(expected, pcjMetadata);
+ }
+
+ /**
+ * Ensure when results have been written to the PCJ table that they are in Accumulo.
+ * <p>
+ * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)}
+ */
+ @Test
+ public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final Connector accumuloConn = cluster.getConnector();
+
+ // Create a PCJ table in the Mini Accumulo.
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+ // Add a few results to the PCJ table.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+ pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
+ new VisibilityBindingSet(alice),
+ new VisibilityBindingSet(bob),
+ new VisibilityBindingSet(charlie)));
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Scan Accumulo for the stored results.
+ final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
+
+ // Ensure the expected results match those that were stored.
+ final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
+ expectedResults.putAll("name;age", results);
+ expectedResults.putAll("age;name", results);
+ assertEquals(expectedResults, fetchedResults);
+ }
+
+ @Test
+ public void listResults() throws Exception {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final Connector accumuloConn = cluster.getConnector();
+
+ // Create a PCJ table in the Mini Accumulo.
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+ // Add a few results to the PCJ table.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
+ new VisibilityBindingSet(alice),
+ new VisibilityBindingSet(bob),
+ new VisibilityBindingSet(charlie)));
+
+ // Fetch the Binding Sets that have been stored in the PCJ table.
+ final Set<BindingSet> results = new HashSet<>();
+
+ final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(accumuloConn, pcjTableName, new Authorizations());
+ try {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ } finally {
+ resultsIt.close();
+ }
+
+ // Verify the fetched results match the expected ones.
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+ assertEquals(expected, results);
+ }
+
+ /**
+ * Ensure when results are already stored in Rya, that we are able to populate
+ * the PCJ table for a new SPARQL query using those results.
+ * <p>
+ * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)}
+ */
+ @Test
+ public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
+ // Load some Triples into Rya.
+ final Set<Statement> triples = new HashSet<>();
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+
+ for(final Statement triple : triples) {
+ ryaConn.add(triple);
+ }
+
+ // Create a PCJ table that will include those triples in its results.
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final Connector accumuloConn = cluster.getConnector();
+
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+ // Populate the PCJ table using a Rya connection.
+ pcjs.populatePcj(accumuloConn, pcjTableName, ryaConn);
+
+ // Scan Accumulo for the stored results.
+ final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Ensure the expected results match those that were stored.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+
+ final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
+ expectedResults.putAll("name;age", results);
+ expectedResults.putAll("age;name", results);
+ assertEquals(expectedResults, fetchedResults);
+ }
+
+ /**
+ * Ensure the method that creates a new PCJ table, scans Rya for matches, and
+ * stores them in the PCJ table works.
+ * <p>
+ * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)}
+ */
+ @Test
+ public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
+ // Load some Triples into Rya.
+ final Set<Statement> triples = new HashSet<>();
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+
+ for(final Statement triple : triples) {
+ ryaConn.add(triple);
+ }
+
+ // Create a PCJ table that will include those triples in its results.
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final Connector accumuloConn = cluster.getConnector();
+
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
+
+ // Create and populate the PCJ table.
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent());
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Scan Accumulo for the stored results.
+ final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
+
+ // Ensure the expected results match those that were stored.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+
+ final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
+ expectedResults.putAll("name;age", results);
+ expectedResults.putAll("age;name", results);
+
+ assertEquals(expectedResults, fetchedResults);
+ }
+
+ @Test
+ public void listPcjs() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
+ final Connector accumuloConn = cluster.getConnector();
+
+ // Set up the table names that will be used.
+ final String instance1 = "instance1_";
+ final String instance2 = "instance2_";
+
+ final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1");
+ final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2");
+ final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3");
+
+ final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1");
+
+ // Create the PCJ Tables that are in instance 1 and instance 2.
+ final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") );
+ final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
+
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, instance1_table1, varOrders, sparql);
+ pcjs.createPcjTable(accumuloConn, instance1_table2, varOrders, sparql);
+ pcjs.createPcjTable(accumuloConn, instance1_table3, varOrders, sparql);
+
+ pcjs.createPcjTable(accumuloConn, instance2_table1, varOrders, sparql);
+
+ // Ensure all of the names have been stored for instance 1 and 2.
+ final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3);
+ final Set<String> instance1Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance1) );
+ assertEquals(expected1, instance1Tables);
+
+ final Set<String> expected2 = Sets.newHashSet(instance2_table1);
+ final Set<String> instance2Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance2) );
+ assertEquals(expected2, instance2Tables);
+ }
+
+ @Test
+ public void purge() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final Connector accumuloConn = cluster.getConnector();
+
+ // Create a PCJ table in the Mini Accumulo.
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+ // Add a few results to the PCJ table.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
+ new VisibilityBindingSet(alice),
+ new VisibilityBindingSet(bob),
+ new VisibilityBindingSet(charlie)));
+
+ // Make sure the cardinality was updated.
+ PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Purge the data.
+ pcjs.purgePcjTable(accumuloConn, pcjTableName);
+
+ // Make sure the cardinality was updated to 0.
+ metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+ assertEquals(0, metadata.getCardinality());
+ }
+
+ @Test
+ public void dropPcj() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
+ final Connector accumuloConn = cluster.getConnector();
+
+ // Create a PCJ index.
+ final String tableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj");
+ final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") );
+ final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
+
+ final PcjTables pcjs = new PcjTables();
+ pcjs.createPcjTable(accumuloConn, tableName, varOrders, sparql);
+
+ // Fetch its metadata to show that it has actually been created.
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+ PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, tableName);
+ assertEquals(expectedMetadata, metadata);
+
+ // Drop it.
+ pcjs.dropPcjTable(accumuloConn, tableName);
+
+ // Show the metadata is no longer present.
+ PCJStorageException tableDoesNotExistException = null;
+ try {
+ metadata = pcjs.getPcjMetadata(accumuloConn, tableName);
+ } catch(final PCJStorageException e) {
+ tableDoesNotExistException = e;
+ }
+ assertNotNull(tableDoesNotExistException);
+ }
+
+ /**
+ * Scan accumulo for the results that are stored in a PCJ table. The
+ * multimap stores a set of deserialized binding sets that were in the PCJ
+ * table for every variable order that is found in the PCJ metadata.
+ */
+ private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException {
+ final Multimap<String, BindingSet> fetchedResults = HashMultimap.create();
+
+ // Get the variable orders the data was written to.
+ final PcjTables pcjs = new PcjTables();
+ final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
+
+ // Scan Accumulo for the stored results.
+ for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
+ final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
+ scanner.fetchColumnFamily( new Text(varOrder.toString()) );
+
+ for(final Entry<Key, Value> entry : scanner) {
+ final byte[] serializedResult = entry.getKey().getRow().getBytes();
+ final BindingSet result = converter.convert(serializedResult, varOrder);
+ fetchedResults.put(varOrder.toString(), result);
+ }
+ }
+
+ return fetchedResults;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
deleted file mode 100644
index e43ab83..0000000
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
+++ /dev/null
@@ -1,573 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.storage.accumulo;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.accumulo.MiniAccumuloSingleton;
-import org.apache.rya.accumulo.RyaTestInstanceRule;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
-import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.NumericLiteralImpl;
-import org.openrdf.model.impl.StatementImpl;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-/**
- * Performs integration test using {@link MiniAccumuloCluster} to ensure the
- * functions of {@link PcjTables} work within a cluster setting.
- */
-public class PcjTablesIntegrationTest {
-
- private static final String USE_MOCK_INSTANCE = ".useMockInstance";
- private static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
- private static final String CLOUDBASE_USER = "sc.cloudbase.username";
- private static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
-
- private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-
- // The MiniAccumuloCluster is re-used between tests.
- private MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance();
-
- // Rya data store and connections.
- protected RyaSailRepository ryaRepo = null;
- protected RepositoryConnection ryaConn = null;
-
- @Rule
- public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
-
- @BeforeClass
- public static void killLoudLogs() {
- Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
- }
-
- @Before
- public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException {
- // Setup the Rya library to use the Mini Accumulo.
- ryaRepo = setupRya();
- ryaConn = ryaRepo.getConnection();
- }
-
- @After
- public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException {
- // Stop Rya.
- if (ryaRepo != null) {
- ryaRepo.shutDown();
- }
- }
-
- private String getRyaInstanceName() {
- return testInstance.getRyaInstanceName();
- }
-
- /**
- * Format a Mini Accumulo to be a Rya repository.
- *
- * @return The Rya repository sitting on top of the Mini Accumulo.
- */
- private RyaSailRepository setupRya() throws AccumuloException, AccumuloSecurityException, RepositoryException {
- // Setup the Rya Repository that will be used to create Repository Connections.
- final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
- final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
- crdfdao.setConnector( cluster.getConnector() );
-
- // Setup Rya configuration values.
- final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix(getRyaInstanceName());
- conf.setDisplayQueryPlan(true);
-
- conf.setBoolean(USE_MOCK_INSTANCE, false);
- conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, getRyaInstanceName());
- conf.set(CLOUDBASE_USER, cluster.getUsername());
- conf.set(CLOUDBASE_PASSWORD, cluster.getPassword());
- conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName());
-
- crdfdao.setConf(conf);
- ryaStore.setRyaDAO(crdfdao);
-
- final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
- ryaRepo.initialize();
-
- return ryaRepo;
- }
-
- /**
- * Ensure that when a new PCJ table is created, it is initialized with the
- * correct metadata values.
- * <p>
- * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)}
- */
- @Test
- public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException {
- final String sparql =
- "SELECT ?name ?age " +
- "{" +
- "FILTER(?age < 30) ." +
- "?name <http://hasAge> ?age." +
- "?name <http://playsSport> \"Soccer\" " +
- "}";
-
- final Connector accumuloConn = cluster.getConnector();
-
- // Create a PCJ table in the Mini Accumulo.
- final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
-
- // Fetch the PcjMetadata and ensure it has the correct values.
- final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
-
- // Ensure the metadata matches the expected value.
- final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
- assertEquals(expected, pcjMetadata);
- }
-
- /**
- * Ensure when results have been written to the PCJ table that they are in Accumulo.
- * <p>
- * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)}
- */
- @Test
- public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
- final String sparql =
- "SELECT ?name ?age " +
- "{" +
- "FILTER(?age < 30) ." +
- "?name <http://hasAge> ?age." +
- "?name <http://playsSport> \"Soccer\" " +
- "}";
-
- final Connector accumuloConn = cluster.getConnector();
-
- // Create a PCJ table in the Mini Accumulo.
- final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
-
- // Add a few results to the PCJ table.
- final MapBindingSet alice = new MapBindingSet();
- alice.addBinding("name", new URIImpl("http://Alice"));
- alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
-
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("name", new URIImpl("http://Bob"));
- bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
-
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("name", new URIImpl("http://Charlie"));
- charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
-
- final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie);
- pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
- new VisibilityBindingSet(alice),
- new VisibilityBindingSet(bob),
- new VisibilityBindingSet(charlie)));
-
- // Make sure the cardinality was updated.
- final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
- assertEquals(3, metadata.getCardinality());
-
- // Scan Accumulo for the stored results.
- final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
-
- // Ensure the expected results match those that were stored.
- final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
- expectedResults.putAll("name;age", results);
- expectedResults.putAll("age;name", results);
- assertEquals(expectedResults, fetchedResults);
- }
-
- @Test
- public void listResults() throws Exception {
- final String sparql =
- "SELECT ?name ?age " +
- "{" +
- "FILTER(?age < 30) ." +
- "?name <http://hasAge> ?age." +
- "?name <http://playsSport> \"Soccer\" " +
- "}";
-
- final Connector accumuloConn = cluster.getConnector();
-
- // Create a PCJ table in the Mini Accumulo.
- final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
-
- // Add a few results to the PCJ table.
- final MapBindingSet alice = new MapBindingSet();
- alice.addBinding("name", new URIImpl("http://Alice"));
- alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
-
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("name", new URIImpl("http://Bob"));
- bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
-
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("name", new URIImpl("http://Charlie"));
- charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
-
- pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
- new VisibilityBindingSet(alice),
- new VisibilityBindingSet(bob),
- new VisibilityBindingSet(charlie)));
-
- // Fetch the Binding Sets that have been stored in the PCJ table.
- final Set<BindingSet> results = new HashSet<>();
-
- final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(accumuloConn, pcjTableName, new Authorizations());
- try {
- while(resultsIt.hasNext()) {
- results.add( resultsIt.next() );
- }
- } finally {
- resultsIt.close();
- }
-
- // Verify the fetched results match the expected ones.
- final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie);
- assertEquals(expected, results);
- }
-
- /**
- * Ensure when results are already stored in Rya, that we are able to populate
- * the PCJ table for a new SPARQL query using those results.
- * <p>
- * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)}
- */
- @Test
- public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
- // Load some Triples into Rya.
- final Set<Statement> triples = new HashSet<>();
- triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
- triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
- triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
- triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
-
- for(final Statement triple : triples) {
- ryaConn.add(triple);
- }
-
- // Create a PCJ table that will include those triples in its results.
- final String sparql =
- "SELECT ?name ?age " +
- "{" +
- "FILTER(?age < 30) ." +
- "?name <http://hasAge> ?age." +
- "?name <http://playsSport> \"Soccer\" " +
- "}";
-
- final Connector accumuloConn = cluster.getConnector();
-
- final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
-
- // Populate the PCJ table using a Rya connection.
- pcjs.populatePcj(accumuloConn, pcjTableName, ryaConn);
-
- // Scan Accumulo for the stored results.
- final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
-
- // Make sure the cardinality was updated.
- final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
- assertEquals(3, metadata.getCardinality());
-
- // Ensure the expected results match those that were stored.
- final MapBindingSet alice = new MapBindingSet();
- alice.addBinding("name", new URIImpl("http://Alice"));
- alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
-
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("name", new URIImpl("http://Bob"));
- bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
-
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("name", new URIImpl("http://Charlie"));
- charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
-
- final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie);
-
- final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
- expectedResults.putAll("name;age", results);
- expectedResults.putAll("age;name", results);
- assertEquals(expectedResults, fetchedResults);
- }
-
- /**
- * Ensure the method that creates a new PCJ table, scans Rya for matches, and
- * stores them in the PCJ table works.
- * <p>
- * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)}
- */
- @Test
- public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
- // Load some Triples into Rya.
- final Set<Statement> triples = new HashSet<>();
- triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
- triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
- triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
- triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
- triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
-
- for(final Statement triple : triples) {
- ryaConn.add(triple);
- }
-
- // Create a PCJ table that will include those triples in its results.
- final String sparql =
- "SELECT ?name ?age " +
- "{" +
- "FILTER(?age < 30) ." +
- "?name <http://hasAge> ?age." +
- "?name <http://playsSport> \"Soccer\" " +
- "}";
-
- final Connector accumuloConn = cluster.getConnector();
-
- final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
-
- // Create and populate the PCJ table.
- final PcjTables pcjs = new PcjTables();
- pcjs.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent());
-
- // Make sure the cardinality was updated.
- final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
- assertEquals(3, metadata.getCardinality());
-
- // Scan Accumulo for the stored results.
- final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
-
- // Ensure the expected results match those that were stored.
- final MapBindingSet alice = new MapBindingSet();
- alice.addBinding("name", new URIImpl("http://Alice"));
- alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
-
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("name", new URIImpl("http://Bob"));
- bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
-
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("name", new URIImpl("http://Charlie"));
- charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
-
- final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie);
-
- final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
- expectedResults.putAll("name;age", results);
- expectedResults.putAll("age;name", results);
-
- assertEquals(expectedResults, fetchedResults);
- }
-
- @Test
- public void listPcjs() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
- final Connector accumuloConn = cluster.getConnector();
-
- // Set up the table names that will be used.
- final String instance1 = "instance1_";
- final String instance2 = "instance2_";
-
- final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1");
- final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2");
- final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3");
-
- final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1");
-
- // Create the PCJ Tables that are in instance 1 and instance 2.
- final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") );
- final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
-
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, instance1_table1, varOrders, sparql);
- pcjs.createPcjTable(accumuloConn, instance1_table2, varOrders, sparql);
- pcjs.createPcjTable(accumuloConn, instance1_table3, varOrders, sparql);
-
- pcjs.createPcjTable(accumuloConn, instance2_table1, varOrders, sparql);
-
- // Ensure all of the names have been stored for instance 1 and 2.
- final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3);
- final Set<String> instance1Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance1) );
- assertEquals(expected1, instance1Tables);
-
- final Set<String> expected2 = Sets.newHashSet(instance2_table1);
- final Set<String> instance2Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance2) );
- assertEquals(expected2, instance2Tables);
- }
-
- @Test
- public void purge() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
- final String sparql =
- "SELECT ?name ?age " +
- "{" +
- "FILTER(?age < 30) ." +
- "?name <http://hasAge> ?age." +
- "?name <http://playsSport> \"Soccer\" " +
- "}";
-
- final Connector accumuloConn = cluster.getConnector();
-
- // Create a PCJ table in the Mini Accumulo.
- final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
-
- // Add a few results to the PCJ table.
- final MapBindingSet alice = new MapBindingSet();
- alice.addBinding("name", new URIImpl("http://Alice"));
- alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
-
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("name", new URIImpl("http://Bob"));
- bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
-
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("name", new URIImpl("http://Charlie"));
- charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
-
- pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
- new VisibilityBindingSet(alice),
- new VisibilityBindingSet(bob),
- new VisibilityBindingSet(charlie)));
-
- // Make sure the cardinality was updated.
- PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
- assertEquals(3, metadata.getCardinality());
-
- // Purge the data.
- pcjs.purgePcjTable(accumuloConn, pcjTableName);
-
- // Make sure the cardinality was updated to 0.
- metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
- assertEquals(0, metadata.getCardinality());
- }
-
- @Test
- public void dropPcj() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
- final Connector accumuloConn = cluster.getConnector();
-
- // Create a PCJ index.
- final String tableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj");
- final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") );
- final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
-
- final PcjTables pcjs = new PcjTables();
- pcjs.createPcjTable(accumuloConn, tableName, varOrders, sparql);
-
- // Fetch its metadata to show that it has actually been created.
- final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
- PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, tableName);
- assertEquals(expectedMetadata, metadata);
-
- // Drop it.
- pcjs.dropPcjTable(accumuloConn, tableName);
-
- // Show the metadata is no longer present.
- PCJStorageException tableDoesNotExistException = null;
- try {
- metadata = pcjs.getPcjMetadata(accumuloConn, tableName);
- } catch(final PCJStorageException e) {
- tableDoesNotExistException = e;
- }
- assertNotNull(tableDoesNotExistException);
- }
-
- /**
- * Scan accumulo for the results that are stored in a PCJ table. The
- * multimap stores a set of deserialized binding sets that were in the PCJ
- * table for every variable order that is found in the PCJ metadata.
- */
- private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException {
- final Multimap<String, BindingSet> fetchedResults = HashMultimap.create();
-
- // Get the variable orders the data was written to.
- final PcjTables pcjs = new PcjTables();
- final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
-
- // Scan Accumulo for the stored results.
- for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
- final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
- scanner.fetchColumnFamily( new Text(varOrder.toString()) );
-
- for(final Entry<Key, Value> entry : scanner) {
- final byte[] serializedResult = entry.getKey().getRow().getBytes();
- final BindingSet result = converter.convert(serializedResult, varOrder);
- fetchedResults.put(varOrder.toString(), result);
- }
- }
-
- return fetchedResults;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java
----------------------------------------------------------------------
diff --git a/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java b/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java
deleted file mode 100644
index 4928886..0000000
--- a/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package org.apache.rya.camel.cbsail;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.apache.rya.camel.cbsail.CbSailComponent;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.test.CamelTestSupport;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.util.HashMap;
-
-public class CbSailIntegrationTest extends CamelTestSupport {
-
- @EndpointInject(uri = "cbsail:tquery?server=stratus13&port=2181&user=root&pwd=password&instanceName=stratus")
- ProducerTemplate producer;
-
- public void testCbSail() throws Exception {
- String underGradInfo = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>" +
- " PREFIX ub: <urn:test:onto:univ#>" +
- " SELECT * WHERE" +
- " {" +
- " <http://www.Department0.University0.edu/UndergraduateStudent600> ?pred ?obj ." +
- " }";
- HashMap map = new HashMap();
- map.put(CbSailComponent.SPARQL_QUERY_PROP, underGradInfo);
- map.put(CbSailComponent.START_TIME_QUERY_PROP, 0l);
- map.put(CbSailComponent.TTL_QUERY_PROP, 86400000l);
- Object o = producer.requestBodyAndHeaders(null, map);
- System.out.println(o);
- Thread.sleep(100000);
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() {
- return new RouteBuilder() {
-
- @Override
- public void configure() throws Exception {
- ValueFactory vf = new ValueFactoryImpl();
- String underGradInfo = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>" +
- " PREFIX ub: <urn:test:onto:univ#>" +
- " SELECT * WHERE" +
- " {" +
- " <http://www.Department0.University0.edu/UndergraduateStudent60> ?pred ?obj ." +
- " }";
- String rawEvents = "PREFIX nh: <http://org.apache.com/2011/02/nh#>\n" +
- " SELECT * WHERE\n" +
- " {\n" +
- " ?uuid nh:timestamp ?timestamp.\n" +
- " ?uuid nh:site ?site;\n" +
- " nh:system ?system;\n" +
- " nh:dataSupplier ?dataSupplier;\n" +
- " nh:dataType ?dataType;\n" +
- " <http://org.apache.com/2011/02/nh#count> ?data.\n" +
- " } LIMIT 100";
- String latestModels = "PREFIX nh: <http://org.apache.com/rdf/2011/02/model#>" +
- " PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>" +
- " SELECT * WHERE" +
- " {" +
- " ?modelUuid nh:dayOfWeek \"5\";" +
- " nh:hourOfDay \"3\";" +
- " nh:timestamp ?timestamp;" +
-// " FILTER (xsd:integer(?timestamp) > 1297652964633)." +
- " nh:dataProperty \"count\";" +
- " nh:modelType \"org.apache.learning.tpami.SimpleGaussianMMModel\";" +
- " nh:site ?site;" +
- " nh:dataSupplier ?dataSupplier;" +
- " nh:system ?system;" +
- " nh:dataType ?dataType;" +
- " nh:model ?model;" +
- " nh:key ?key." +
- " }";
-
- from("timer://foo?fixedRate=true&period=60000").
- setHeader(CbSailComponent.SPARQL_QUERY_PROP, constant(underGradInfo)).
-// setBody(constant(new StatementImpl(vf.createURI("http://www.Department0.University0.edu/UndergraduateStudent610"), vf.createURI("urn:test:onto:univ#testPred"), vf.createLiteral("test")))).
- to("cbsail:tquery?server=stratus13&port=2181&user=root&pwd=password&instanceName=stratus&queryOutput=XML" +
-// "&ttl=259200000"
-// + "&sparql=" + latestModels" +
- "").process(new Processor() {
-
- @Override
- public void process(Exchange exchange) throws Exception {
- System.out.println(exchange.getIn().getBody());
-// if (body != null)
-// System.out.println(body.size());
- }
- }).end();
- }
- };
- }
-
-}