You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/04/24 15:06:22 UTC
[4/9] incubator-rya git commit: RYA-260 Fluo PCJ application has had
Aggregation support added to it. Also fixed a bunch of resource leaks that
were causing integration tests to fail. Closes #156.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 3bb54c4..3a42a23 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -24,58 +24,34 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.junit.Test;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.mini.MiniFluo;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
/**
* Tests the methods of {@link CountStatements}.
*/
-public class CountStatementsIT extends ITBase {
+public class CountStatementsIT extends RyaExportITBase {
/**
* Overriden so that no Observers will be started. This ensures whatever
* statements are inserted as part of the test will not be consumed.
- *
- * @return A Mini Fluo cluster.
*/
@Override
- protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
+ protected void preFluoInitHook() throws Exception {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
- // Configure how the mini fluo will run.
- final FluoConfiguration config = new FluoConfiguration();
- config.setMiniStartAccumulo(false);
- config.setAccumuloInstance(instanceName);
- config.setAccumuloUser(ACCUMULO_USER);
- config.setAccumuloPassword(ACCUMULO_PASSWORD);
- config.setInstanceZookeepers(zookeepers + "/fluo");
- config.setAccumuloZookeepers(zookeepers);
-
- config.setApplicationName(FLUO_APP_NAME);
- config.setAccumuloTable("fluo" + FLUO_APP_NAME);
-
- config.addObservers(observers);
-
- FluoFactory.newAdmin(config).initialize(
- new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
- final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
- return miniFluo;
+ // Add the observers to the Fluo Configuration.
+ super.getFluoConfiguration().addObservers(observers);
}
-
@Test
public void countStatements() {
// Insert some Triples into the Fluo app.
@@ -86,12 +62,14 @@ public class CountStatementsIT extends ITBase {
triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
- new InsertTriples().insert(fluoClient, triples, Optional.<String>absent());
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ new InsertTriples().insert(fluoClient, triples, Optional.<String>absent());
- // Load some statements into the Fluo app.
- final BigInteger count = new CountStatements().countStatements(fluoClient);
+ // Load some statements into the Fluo app.
+ final BigInteger count = new CountStatements().countStatements(fluoClient);
- // Ensure the count matches the expected values.
- assertEquals(BigInteger.valueOf(5), count);
+ // Ensure the count matches the expected values.
+ assertEquals(BigInteger.valueOf(5), count);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 82b61bd..0aceaa3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -26,8 +26,11 @@ import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -47,7 +50,7 @@ import com.google.common.collect.Sets;
/**
* Integration tests the methods of {@link GetPcjMetadata}.
*/
-public class GetPcjMetadataIT extends ITBase {
+public class GetPcjMetadataIT extends RyaExportITBase {
@Test
public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException {
@@ -59,56 +62,60 @@ public class GetPcjMetadataIT extends ITBase {
"}";
// Create the PCJ table.
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
final String pcjId = pcjStorage.createPcj(sparql);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
- // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
- final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0);
- final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId);
+ // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
+ final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0);
+ final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId);
- // Ensure the command returns the correct metadata.
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
- final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
- assertEquals(expected, metadata);
+ // Ensure the command returns the correct metadata.
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
+ assertEquals(expected, metadata);
+ }
}
@Test
public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException {
-
- final CreatePcj createPcj = new CreatePcj();
-
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
- // Add a couple of queries to Accumulo.
- final String q1Sparql =
- "SELECT ?x " +
- "WHERE { " +
- "?x <http://talksTo> <http://Eve>. " +
- "?x <http://worksAt> <http://Chipotle>." +
- "}";
- final String q1PcjId = pcjStorage.createPcj(q1Sparql);
- createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
- final String q2Sparql =
- "SELECT ?x ?y " +
- "WHERE { " +
- "?x <http://talksTo> ?y. " +
- "?y <http://worksAt> <http://Chipotle>." +
- "}";
- final String q2PcjId = pcjStorage.createPcj(q2Sparql);
- createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
- // Ensure the command returns the correct metadata.
- final Set<PcjMetadata> expected = new HashSet<>();
- final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql);
- final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql);
- expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
- expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
-
- final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
- assertEquals(expected, Sets.newHashSet( metadata.values() ));
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Add a couple of queries to Accumulo.
+ final String q1Sparql =
+ "SELECT ?x " +
+ "WHERE { " +
+ "?x <http://talksTo> <http://Eve>. " +
+ "?x <http://worksAt> <http://Chipotle>." +
+ "}";
+ final String q1PcjId = pcjStorage.createPcj(q1Sparql);
+ final CreatePcj createPcj = new CreatePcj();
+ createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+ final String q2Sparql =
+ "SELECT ?x ?y " +
+ "WHERE { " +
+ "?x <http://talksTo> ?y. " +
+ "?y <http://worksAt> <http://Chipotle>." +
+ "}";
+ final String q2PcjId = pcjStorage.createPcj(q2Sparql);
+ createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+ // Ensure the command returns the correct metadata.
+ final Set<PcjMetadata> expected = new HashSet<>();
+ final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql);
+ final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql);
+ expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders));
+ expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders));
+
+ final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
+ assertEquals(expected, Sets.newHashSet( metadata.values() ));
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 85c31a0..10f2319 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -26,8 +26,12 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
@@ -42,7 +46,7 @@ import com.google.common.collect.Sets;
/**
* Integration tests the methods of {@link GetQueryReportl}.
*/
-public class GetQueryReportIT extends ITBase {
+public class GetQueryReportIT extends RyaExportITBase {
@Test
public void getReport() throws Exception {
@@ -56,69 +60,72 @@ public class GetQueryReportIT extends ITBase {
// Triples that will be streamed into Fluo after the PCJ has been created.
final Set<RyaStatement> streamedTriples = Sets.newHashSet(
- makeRyaStatement("http://Alice", "http://worksAt", "http://Taco Shop"),
- makeRyaStatement("http://Alice", "http://worksAt", "http://Burger Join"),
- makeRyaStatement("http://Alice", "http://worksAt", "http://Pastery Shop"),
- makeRyaStatement("http://Alice", "http://worksAt", "http://Burrito Place"),
- makeRyaStatement("http://Alice", "http://livesIn", "http://Lost County"),
- makeRyaStatement("http://Alice", "http://livesIn", "http://Big City"),
- makeRyaStatement("http://Bob", "http://worksAt", "http://Burrito Place"),
- makeRyaStatement("http://Bob", "http://livesIn", "http://Big City"),
- makeRyaStatement("http://Charlie", "http://worksAt", "http://Burrito Place"),
- makeRyaStatement("http://Charlie", "http://livesIn", "http://Big City"),
- makeRyaStatement("http://David", "http://worksAt", "http://Burrito Place"),
- makeRyaStatement("http://David", "http://livesIn", "http://Lost County"),
- makeRyaStatement("http://Eve", "http://worksAt", "http://Burrito Place"),
- makeRyaStatement("http://Eve", "http://livesIn", "http://Big City"),
- makeRyaStatement("http://Frank", "http://worksAt", "http://Burrito Place"),
- makeRyaStatement("http://Frank", "http://livesIn", "http://Lost County"));
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Taco Shop")),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Burger Join")),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Pastery Shop")),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+ new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+ new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+ new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+ new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+ new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+ new RyaStatement(new RyaURI("http://David"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")),
+ new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+ new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")),
+ new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")),
+ new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")));
// Create the PCJ table.
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
final String pcjId = pcjStorage.createPcj(sparql);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
- // Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+ // Stream the data into Fluo.
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
- // Wait for the results to finish processing.
- fluo.waitForObservers();
+ // Wait for the results to finish processing.
+ super.getMiniFluo().waitForObservers();
- // Fetch the report.
- final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
- final Set<String> queryIds = metadata.keySet();
- assertEquals(1, queryIds.size());
- final String queryId = queryIds.iterator().next();
+ // Fetch the report.
+ final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient);
+ final Set<String> queryIds = metadata.keySet();
+ assertEquals(1, queryIds.size());
+ final String queryId = queryIds.iterator().next();
- final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId);
+ final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId);
- // Build the expected counts map.
- final Map<String, BigInteger> expectedCounts = new HashMap<>();
+ // Build the expected counts map.
+ final Map<String, BigInteger> expectedCounts = new HashMap<>();
- final FluoQuery fluoQuery = report.getFluoQuery();
+ final FluoQuery fluoQuery = report.getFluoQuery();
- final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
- expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
+ final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
+ expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
- final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();
- expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
+ final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();
+ expectedCounts.put(filterNodeId, BigInteger.valueOf(8));
- final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId();
- expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
+ final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId();
+ expectedCounts.put(joinNodeId, BigInteger.valueOf(13));
- final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator();
- final StatementPatternMetadata sp1 = patterns.next();
- final StatementPatternMetadata sp2 = patterns.next();
- if(sp1.getStatementPattern().contains("http://worksAt")) {
- expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
- expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
- } else {
- expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
- expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
- }
+ final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator();
+ final StatementPatternMetadata sp1 = patterns.next();
+ final StatementPatternMetadata sp2 = patterns.next();
+ if(sp1.getStatementPattern().contains("http://worksAt")) {
+ expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9));
+ expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7));
+ } else {
+ expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9));
+ expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7));
+ }
- assertEquals(expectedCounts, report.getCounts());
+ assertEquals(expectedCounts, report.getCounts());
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index 19bc272..ec301ba 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -26,17 +26,18 @@ import java.util.List;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.junit.Test;
import com.beust.jcommander.internal.Lists;
-import org.apache.fluo.api.client.Transaction;
-
/**
* Integration tests the methods of {@link ListQueryIds}.
*/
-public class ListQueryIdsIT extends ITBase {
+public class ListQueryIdsIT extends RyaExportITBase {
/**
* This test ensures that when there are PCJ tables in Accumulo as well as
@@ -45,18 +46,20 @@ public class ListQueryIdsIT extends ITBase {
*/
@Test
public void getQueryIds() throws AccumuloException, AccumuloSecurityException, TableExistsException {
- // Store a few SPARQL/Query ID pairs in the Fluo table.
- try(Transaction tx = fluoClient.newTransaction()) {
- tx.set("SPARQL_3", QUERY_ID, "ID_3");
- tx.set("SPARQL_1", QUERY_ID, "ID_1");
- tx.set("SPARQL_4", QUERY_ID, "ID_4");
- tx.set("SPARQL_2", QUERY_ID, "ID_2");
- tx.commit();
- }
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Store a few SPARQL/Query ID pairs in the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ tx.set("SPARQL_3", QUERY_ID, "ID_3");
+ tx.set("SPARQL_1", QUERY_ID, "ID_1");
+ tx.set("SPARQL_4", QUERY_ID, "ID_4");
+ tx.set("SPARQL_2", QUERY_ID, "ID_2");
+ tx.commit();
+ }
- // Ensure the correct list of Query IDs is retured.
- final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4");
- final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient);
- assertEquals(expected, queryIds);
+ // Ensure the correct list of Query IDs is retured.
+ final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4");
+ final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient);
+ assertEquals(expected, queryIds);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index d5ed447..082f46d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,7 +20,13 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static org.junit.Assert.assertEquals;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -30,13 +36,10 @@ import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
import org.openrdf.repository.RepositoryException;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-
/**
* Integration tests the methods of {@link FluoQueryMetadataDAO}.
*/
-public class FluoQueryMetadataDAOIT extends ITBase {
+public class FluoQueryMetadataDAOIT extends RyaExportITBase {
@Test
public void statementPatternMetadataTest() throws RepositoryException {
@@ -49,20 +52,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
builder.setParentNodeId("parentNodeId");
final StatementPatternMetadata originalMetadata = builder.build();
- // Write it to the Fluo table.
- try(Transaction tx = fluoClient.newTransaction()) {
- dao.write(tx, originalMetadata);
- tx.commit();
- }
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
- // Read it from the Fluo table.
- StatementPatternMetadata storedMetadata = null;
- try(Snapshot sx = fluoClient.newSnapshot()) {
- storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId");
- }
+ // Read it from the Fluo table.
+ StatementPatternMetadata storedMetadata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId");
+ }
- // Ensure the deserialized object is the same as the serialized one.
- assertEquals(originalMetadata, storedMetadata);
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetadata);
+ }
}
@Test
@@ -78,20 +83,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
builder.setFilterIndexWithinSparql(2);
final FilterMetadata originalMetadata = builder.build();
- // Write it to the Fluo table.
- try(Transaction tx = fluoClient.newTransaction()) {
- dao.write(tx, originalMetadata);
- tx.commit();
- }
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
- // Read it from the Fluo table.
- FilterMetadata storedMetadata = null;
- try(Snapshot sx = fluoClient.newSnapshot()) {
- storedMetadata = dao.readFilterMetadata(sx, "nodeId");
- }
+ // Read it from the Fluo table.
+ FilterMetadata storedMetadata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetadata = dao.readFilterMetadata(sx, "nodeId");
+ }
- // Ensure the deserialized object is the same as the serialized one.
- assertEquals(originalMetadata, storedMetadata);
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetadata);
+ }
}
@Test
@@ -107,20 +114,22 @@ public class FluoQueryMetadataDAOIT extends ITBase {
builder.setRightChildNodeId("rightChildNodeId");
final JoinMetadata originalMetadata = builder.build();
- // Write it to the Fluo table.
- try(Transaction tx = fluoClient.newTransaction()) {
- dao.write(tx, originalMetadata);
- tx.commit();
- }
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
- // Read it from the Fluo table.
- JoinMetadata storedMetadata = null;
- try(Snapshot sx = fluoClient.newSnapshot()) {
- storedMetadata = dao.readJoinMetadata(sx, "nodeId");
- }
+ // Read it from the Fluo table.
+ JoinMetadata storedMetadata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetadata = dao.readJoinMetadata(sx, "nodeId");
+ }
- // Ensure the deserialized object is the same as the serialized one.
- assertEquals(originalMetadata, storedMetadata);
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetadata);
+ }
}
@Test
@@ -134,20 +143,85 @@ public class FluoQueryMetadataDAOIT extends ITBase {
builder.setChildNodeId("childNodeId");
final QueryMetadata originalMetadata = builder.build();
- // Write it to the Fluo table.
- try(Transaction tx = fluoClient.newTransaction()) {
- dao.write(tx, originalMetadata);
- tx.commit();
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
+
+ // Read it from the Fluo table.
+ QueryMetadata storedMetdata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+ }
+
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetdata);
}
+ }
+
+ @Test
+ public void aggregationMetadataTest_withGroupByVarOrders() {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ // Create the object that will be serialized.
+ final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
+ .setVariableOrder(new VariableOrder("totalCount"))
+ .setParentNodeId("parentNodeId")
+ .setChildNodeId("childNodeId")
+ .setGroupByVariableOrder(new VariableOrder("a", "b", "c"))
+ .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount"))
+ .addAggregation(new AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice"))
+ .build();
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
- // Read it from the Fluo table.
- QueryMetadata storedMetdata = null;
- try(Snapshot sx = fluoClient.newSnapshot()) {
- storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+ // Read it from the Fluo table.
+ AggregationMetadata storedMetadata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
+ }
+
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetadata);
}
+ }
+
+ @Test
+ public void aggregationMetadataTest_noGroupByVarOrders() {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ // Create the object that will be serialized.
+ final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
+ .setVariableOrder(new VariableOrder("totalCount"))
+ .setParentNodeId("parentNodeId")
+ .setChildNodeId("childNodeId")
+ .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount"))
+ .addAggregation(new AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice"))
+ .build();
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
+
+ // Read it from the Fluo table.
+ AggregationMetadata storedMetadata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
+ }
- // Ensure the deserialized object is the same as the serialized one.
- assertEquals(originalMetadata, storedMetdata);
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetadata);
+ }
}
@Test
@@ -168,19 +242,21 @@ public class FluoQueryMetadataDAOIT extends ITBase {
final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
- // Write it to the Fluo table.
- try(Transaction tx = fluoClient.newTransaction()) {
- dao.write(tx, originalQuery);
- tx.commit();
- }
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalQuery);
+ tx.commit();
+ }
- // Read it from the Fluo table.
- FluoQuery storedQuery = null;
- try(Snapshot sx = fluoClient.newSnapshot()) {
- storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
- }
+ // Read it from the Fluo table.
+ FluoQuery storedQuery = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+ }
- // Ensure the deserialized object is the same as the serialized one.
- assertEquals(originalQuery, storedQuery);
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalQuery, storedQuery);
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index b4c8d69..21d7db0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -18,100 +18,152 @@
*/
package org.apache.rya.indexing.pcj.fluo.integration;
+import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.junit.Test;
import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.sail.SailRepositoryConnection;
import com.google.common.collect.Sets;
-public class CreateDeleteIT extends ITBase {
+/**
+ * Tests that ensure the PCJ delete support works.
+ */
+public class CreateDeleteIT extends RyaExportITBase {
- /**
- * Ensure historic matches are included in the result.
- */
@Test
- public void historicResults() throws Exception {
+ public void deletePCJ() throws Exception {
// A query that finds people who talk to Eve and work at Chipotle.
- final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. "
- + "?x <http://worksAt> <http://Chipotle>." + "}";
+ final String sparql =
+ "SELECT ?x " + "WHERE { " +
+ "?x <http://talksTo> <http://Eve>. " +
+ "?x <http://worksAt> <http://Chipotle>." +
+ "}";
// Triples that are loaded into Rya before the PCJ is created.
- final Set<Statement> historicTriples = Sets.newHashSet(
- makeStatement("http://Alice", "http://talksTo", "http://Eve"),
- makeStatement("http://Bob", "http://talksTo", "http://Eve"),
- makeStatement("http://Charlie", "http://talksTo", "http://Eve"),
-
- makeStatement("http://Eve", "http://helps", "http://Kevin"),
-
- makeStatement("http://Bob", "http://worksAt", "http://Chipotle"),
- makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
- makeStatement("http://Eve", "http://worksAt", "http://Chipotle"),
- makeStatement("http://David", "http://worksAt", "http://Chipotle"));
-
- // The expected results of the SPARQL query once the PCJ has been
- // computed.
- final Set<BindingSet> expected = new HashSet<>();
- expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob"))));
- expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie"))));
-
- // Load the historic data into Rya.
- for (final Statement triple : historicTriples) {
- ryaConn.add(triple);
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Set<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+ vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+ vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+
+ vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")),
+
+ vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+ vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+ vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+ vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
+
+ // Create the PCJ in Fluo and load the statements into Rya.
+ final String pcjId = loadData(sparql, statements);
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Ensure the data was loaded.
+ final List<Bytes> rows = getFluoTableEntries(fluoClient);
+ assertEquals(17, rows.size());
+
+ // Delete the PCJ from the Fluo application.
+ new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+ // Ensure all data related to the query has been removed.
+ final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+ assertEquals(0, empty_rows.size());
}
+ }
+
+ @Test
+ public void deleteAggregation() throws Exception {
+ // A query that finds the maximum price for an item within the inventory.
+ final String sparql =
+ "SELECT (max(?price) as ?maxPrice) { " +
+ "?item <urn:price> ?price . " +
+ "}";
+
+ // Create the Statements that will be loaded into Rya.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.50)),
+ vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(0.99)),
+ vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+ // Create the PCJ in Fluo and load the statements into Rya.
+ final String pcjId = loadData(sparql, statements);
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Ensure the data was loaded.
+ final List<Bytes> rows = getFluoTableEntries(fluoClient);
+ assertEquals(10, rows.size());
+
+ // Delete the PCJ from the Fluo application.
+ new DeletePcj(1).deletePcj(fluoClient, pcjId);
+
+ // Ensure all data related to the query has been removed.
+ final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
+ assertEquals(0, empty_rows.size());
+ }
+ }
- // Create the PCJ table.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
- final String pcjId = pcjStorage.createPcj(sparql);
+ private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
+ requireNonNull(sparql);
+ requireNonNull(statements);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+ // Register the PCJ with Rya.
+ final Instance accInstance = super.getAccumuloConnector().getInstance();
+ final Connector accumuloConn = super.getAccumuloConnector();
- // Verify the end results of the query match the expected results.
- fluo.waitForObservers();
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD.toCharArray(),
+ accInstance.getInstanceName(),
+ accInstance.getZooKeepers()), accumuloConn);
- final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
- List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(17, rows.size());
+ // Write the data to Rya.
+ final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
+ ryaConn.begin();
+ ryaConn.add(statements);
+ ryaConn.commit();
+ ryaConn.close();
- // Delete the PCJ from the Fluo application.
- new DeletePcj(1).deletePcj(fluoClient, pcjId);
+ // Wait for the Fluo application to finish computing the end result.
+ super.getMiniFluo().waitForObservers();
- // Ensure all data related to the query has been removed.
- List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ // The PCJ Id is the topic name the results will be written to.
+ return pcjId;
}
- private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
+ private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) {
try (Snapshot snapshot = fluoClient.newSnapshot()) {
- List<Bytes> rows = new ArrayList<>();
- RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
+ final List<Bytes> rows = new ArrayList<>();
+ final RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
- for(ColumnScanner cscanner: rscanner) {
+ for(final ColumnScanner cscanner: rscanner) {
rows.add(cscanner.getRow());
}
-
+
return rows;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index dcab997..ab97bbd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -19,32 +19,37 @@
package org.apache.rya.indexing.pcj.fluo.integration;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import java.util.HashSet;
import java.util.Set;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.junit.Test;
import org.openrdf.model.Statement;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepositoryConnection;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
/**
* Performs integration tests over the Fluo application geared towards various types of input.
- * <p>
- * These tests are being ignore so that they will not run as unit tests while building the application.
*/
-public class InputIT extends ITBase {
+public class InputIT extends RyaExportITBase {
/**
* Ensure historic matches are included in the result.
@@ -53,49 +58,64 @@ public class InputIT extends ITBase {
public void historicResults() throws Exception {
// A query that finds people who talk to Eve and work at Chipotle.
final String sparql =
- "SELECT ?x " +
- "WHERE { " +
+ "SELECT ?x WHERE { " +
"?x <http://talksTo> <http://Eve>. " +
"?x <http://worksAt> <http://Chipotle>." +
"}";
// Triples that are loaded into Rya before the PCJ is created.
+ final ValueFactory vf = new ValueFactoryImpl();
final Set<Statement> historicTriples = Sets.newHashSet(
- makeStatement("http://Alice", "http://talksTo", "http://Eve"),
- makeStatement("http://Bob", "http://talksTo", "http://Eve"),
- makeStatement("http://Charlie", "http://talksTo", "http://Eve"),
+ vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+ vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+ vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
- makeStatement("http://Eve", "http://helps", "http://Kevin"),
+ vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")),
- makeStatement("http://Bob", "http://worksAt", "http://Chipotle"),
- makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
- makeStatement("http://Eve", "http://worksAt", "http://Chipotle"),
- makeStatement("http://David", "http://worksAt", "http://Chipotle"));
+ vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+ vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+ vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")),
+ vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
// The expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expected = new HashSet<>();
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Bob"))));
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Charlie"))));
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Bob"));
+ expected.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Charlie"));
+ expected.add(bs);
// Load the historic data into Rya.
+ final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
for(final Statement triple : historicTriples) {
ryaConn.add(triple);
}
+ ryaConn.close();
// Create the PCJ table.
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
final String pcjId = pcjStorage.createPcj(sparql);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+ // Verify the end results of the query match the expected results.
+ super.getMiniFluo().waitForObservers();
- // Verify the end results of the query match the expected results.
- fluo.waitForObservers();
+ final Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
- final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
+ assertEquals(expected, results);
+ }
}
/**
@@ -105,51 +125,67 @@ public class InputIT extends ITBase {
public void streamedResults() throws Exception {
// A query that finds people who talk to Eve and work at Chipotle.
final String sparql =
- "SELECT ?x " +
- "WHERE { " +
+ "SELECT ?x WHERE { " +
"?x <http://talksTo> <http://Eve>. " +
"?x <http://worksAt> <http://Chipotle>." +
"}";
// Triples that will be streamed into Fluo after the PCJ has been created.
final Set<RyaStatement> streamedTriples = Sets.newHashSet(
- makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"),
- makeRyaStatement("http://Bob", "http://talksTo", "http://Eve"),
- makeRyaStatement("http://Charlie", "http://talksTo", "http://Eve"),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+ new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+ new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
- makeRyaStatement("http://Eve", "http://helps", "http://Kevin"),
+ new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://helps"), new RyaURI("http://Kevin")),
- makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"),
- makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
- makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"),
- makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"));
+ new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")),
+ new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")),
+ new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")),
+ new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")));
// The expected results of the SPARQL query once the PCJ has been computed.
+ final ValueFactory vf = new ValueFactoryImpl();
final Set<BindingSet> expected = new HashSet<>();
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Bob"))));
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Charlie"))));
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Bob"));
+ expected.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Charlie"));
+ expected.add(bs);
// Create the PCJ table.
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
final String pcjId = pcjStorage.createPcj(sparql);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
- // Ensure the query has no results yet.
- fluo.waitForObservers();
- Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
- assertTrue( results.isEmpty() );
+ // Ensure the query has no results yet.
+ super.getMiniFluo().waitForObservers();
- // Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ assertFalse( resultsIt.hasNext() );
+ }
- // Verify the end results of the query match the expected results.
- fluo.waitForObservers();
- results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
+ // Stream the data into Fluo.
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+
+ // Verify the end results of the query match the expected results.
+ super.getMiniFluo().waitForObservers();
+
+ final HashSet<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+
+ assertEquals(expected, results);
+ }
}
/**
@@ -162,53 +198,75 @@ public class InputIT extends ITBase {
public void historicThenStreamedResults() throws Exception {
// A query that finds people who talk to Eve and work at Chipotle.
final String sparql =
- "SELECT ?x " +
- "WHERE { " +
+ "SELECT ?x WHERE { " +
"?x <http://talksTo> <http://Eve>. " +
"?x <http://worksAt> <http://Chipotle>." +
"}";
// Triples that are loaded into Rya before the PCJ is created.
+ final ValueFactory vf = new ValueFactoryImpl();
final Set<Statement> historicTriples = Sets.newHashSet(
- makeStatement("http://Alice", "http://talksTo", "http://Eve"),
- makeStatement("http://Alice", "http://worksAt", "http://Chipotle"));
+ vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+ vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
// Triples that will be streamed into Fluo after the PCJ has been created.
final Set<RyaStatement> streamedTriples = Sets.newHashSet(
- makeRyaStatement("http://Frank", "http://talksTo", "http://Eve"),
- makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+ new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+ new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")));
// Load the historic data into Rya.
+ final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
for(final Statement triple : historicTriples) {
ryaConn.add(triple);
}
+ ryaConn.close();
// Create the PCJ table.
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
final String pcjId = pcjStorage.createPcj(sparql);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
- // Ensure Alice is a match.
- fluo.waitForObservers();
- final Set<BindingSet> expected = new HashSet<>();
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Alice"))));
+ // Ensure Alice is a match.
+ super.getMiniFluo().waitForObservers();
- Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
+ final Set<BindingSet> expected = new HashSet<>();
- // Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Alice"));
+ expected.add(bs);
- // Verify the end results of the query also include Frank.
- fluo.waitForObservers();
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Frank"))));
+ Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add(resultsIt.next());
+ }
+ }
- results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
+ assertEquals(expected, results);
+
+ // Stream the data into Fluo.
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+
+ // Verify the end results of the query also include Frank.
+ super.getMiniFluo().waitForObservers();
+
+ bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Frank"));
+ expected.add(bs);
+
+ results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add(resultsIt.next());
+ }
+ }
+
+ assertEquals(expected, results);
+ }
}
/**
@@ -222,50 +280,69 @@ public class InputIT extends ITBase {
public void historicAndStreamConflict() throws Exception {
// A query that finds people who talk to Eve and work at Chipotle.
final String sparql =
- "SELECT ?x " +
- "WHERE { " +
+ "SELECT ?x WHERE { " +
"?x <http://talksTo> <http://Eve>. " +
"?x <http://worksAt> <http://Chipotle>." +
"}";
// Triples that are loaded into Rya before the PCJ is created.
+ final ValueFactory vf = new ValueFactoryImpl();
final Set<Statement> historicTriples = Sets.newHashSet(
- makeStatement("http://Alice", "http://talksTo", "http://Eve"),
- makeStatement("http://Alice", "http://worksAt", "http://Chipotle"));
+ vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),
+ vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")));
// Triples that will be streamed into Fluo after the PCJ has been created.
final Set<RyaStatement> streamedTriples = Sets.newHashSet(
- makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"),
- makeRyaStatement("http://Alice", "http://worksAt", "http://Chipotle"));
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")),
+ new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")));
// The expected final result.
final Set<BindingSet> expected = new HashSet<>();
- expected.add(makeBindingSet(
- new BindingImpl("x", new URIImpl("http://Alice"))));
+
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("x", vf.createURI("http://Alice"));
+ expected.add(bs);
// Load the historic data into Rya.
+ final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
for(final Statement triple : historicTriples) {
ryaConn.add(triple);
}
+ ryaConn.close();
// Create the PCJ table.
+ final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
final String pcjId = pcjStorage.createPcj(sparql);
- // Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
-
- // Ensure Alice is a match.
- fluo.waitForObservers();
- Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
-
- // Stream the same Alice triple into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
-
- // Verify the end results of the query is stiill only Alice.
- fluo.waitForObservers();
- results = getQueryBindingSetValues(fluoClient, sparql);
- assertEquals(expected, results);
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+ // Ensure Alice is a match.
+ super.getMiniFluo().waitForObservers();
+
+ Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+ assertEquals(expected, results);
+
+ // Stream the same Alice triple into Fluo.
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
+
+ // Verify the end results of the query is stiill only Alice.
+ super.getMiniFluo().waitForObservers();
+
+ results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+ assertEquals(expected, results);
+ }
}
}
\ No newline at end of file