You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:27 UTC
[2/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index 7a73b41..f44db6c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -25,10 +25,12 @@ import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.fluo.client.util.Report.ReportItem;
@@ -56,26 +58,41 @@ public class QueryReportRenderer {
final FluoQuery metadata = queryReport.getFluoQuery();
- switch (metadata.getQueryType()) {
- case Projection:
- final QueryMetadata queryMetadata = metadata.getQueryMetadata().get();
- builder.appendItem(new ReportItem("QUERY NODE"));
- builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
- builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
- builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql())));
- builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
- builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
- break;
- case Construct:
+ QueryMetadata queryMetadata = metadata.getQueryMetadata();
+ builder.appendItem( new ReportItem("") );
+
+ builder.appendItem(new ReportItem("QUERY NODE"));
+ builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
+ builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
+ builder.appendItem( new ReportItem("SPARQL", queryMetadata.getSparql()) );
+ builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
+ builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
+
+
+
+ if (metadata.getQueryType() == QueryType.Construct) {
+ builder.appendItem( new ReportItem("") );
+
final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();
builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE"));
builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId()));
builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString()));
- builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql())));
+ builder.appendItem( new ReportItem("Parent Node ID", constructMetadata.getParentNodeId()) );
builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId()));
builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString()));
builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId())));
}
+
+ for (ProjectionMetadata projectionMetadata : metadata.getProjectionMetadata()) {
+ builder.appendItem( new ReportItem("") );
+
+ builder.appendItem(new ReportItem("PROJECTION NODE"));
+ builder.appendItem(new ReportItem("Node ID", projectionMetadata.getNodeId()));
+ builder.appendItem(new ReportItem("Variable Order", projectionMetadata.getVariableOrder().toString()));
+ builder.appendItem( new ReportItem("Parent Node ID", projectionMetadata.getParentNodeId()) );
+ builder.appendItem(new ReportItem("Child Node ID", projectionMetadata.getChildNodeId()));
+ builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(projectionMetadata.getNodeId())));
+ }
for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) {
builder.appendItem( new ReportItem("") );
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index c8dc737..f25b573 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -33,7 +33,7 @@ import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -179,7 +179,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain it.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
} catch (MalformedQueryException | PcjException | RyaDAOException e) {
throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
index 124569b..bbdcfec 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
import org.openrdf.model.Statement;
import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
public class ConstructGraphTestUtils {
@@ -47,8 +48,18 @@ public class ConstructGraphTestUtils {
public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
- subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
- subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+ for(RyaSubGraph subgraph: subgraph1) {
+ int key = getKey(subgraph);
+ subGraphMap.put(key, subgraph);
+ }
+
+ for(RyaSubGraph subgraph: subgraph2) {
+ int key = getKey(subgraph);
+ RyaSubGraph sub = subGraphMap.get(key);
+ Preconditions.checkNotNull(sub);
+ Set<RyaStatement> statements = sub.getStatements();
+ ryaStatementsEqualIgnoresBlankNode(subgraph.getStatements(), statements);
+ }
}
private static int getKey(RyaSubGraph subgraph) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index d5c0e5f..263a19e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -68,7 +68,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
// Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0);
@@ -95,7 +95,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
"?x <http://worksAt> <http://Chipotle>." +
"}";
final String q1PcjId = pcjStorage.createPcj(q1Sparql);
- final CreatePcj createPcj = new CreatePcj();
+ final CreateFluoPcj createPcj = new CreateFluoPcj();
createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
final String q2Sparql =
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 965a7b9..c0f0f16 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -85,7 +85,7 @@ public class GetQueryReportIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
// Stream the data into Fluo.
new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
@@ -106,7 +106,7 @@ public class GetQueryReportIT extends RyaExportITBase {
final FluoQuery fluoQuery = report.getFluoQuery();
- final String queryNodeId = fluoQuery.getQueryMetadata().get().getNodeId();
+ final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId();
expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index d403404..315dddb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,6 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -28,11 +30,12 @@ import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
@@ -113,7 +116,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Create the object that will be serialized.
final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId");
- builder.setVariableOrder(new VariableOrder("g;y;s"));
+ builder.setVarOrder(new VariableOrder("g;y;s"));
builder.setJoinType(JoinType.NATURAL_JOIN);
builder.setParentNodeId("parentNodeId");
builder.setLeftChildNodeId("leftChildNodeId");
@@ -143,10 +146,13 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
// Create the object that will be serialized.
- final QueryMetadata.Builder builder = QueryMetadata.builder("nodeId");
- builder.setVariableOrder(new VariableOrder("y;s;d"));
+ String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+ final QueryMetadata.Builder builder = QueryMetadata.builder(queryId);
+ builder.setQueryType(QueryType.Projection);
+ builder.setVarOrder(new VariableOrder("y;s;d"));
builder.setSparql("sparql string");
builder.setChildNodeId("childNodeId");
+ builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.Kafka)));
final QueryMetadata originalMetadata = builder.build();
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -159,7 +165,37 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Read it from the Fluo table.
QueryMetadata storedMetdata = null;
try(Snapshot sx = fluoClient.newSnapshot()) {
- storedMetdata = dao.readQueryMetadata(sx, "nodeId");
+ storedMetdata = dao.readQueryMetadata(sx, queryId);
+ }
+
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetdata);
+ }
+ }
+
+ @Test
+ public void projectionMetadataTest() {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ // Create the object that will be serialized.
+ final ProjectionMetadata.Builder builder = ProjectionMetadata.builder("nodeId");
+ builder.setVarOrder(new VariableOrder("y;s;d"));
+ builder.setProjectedVars(new VariableOrder("x;y;z"));
+ builder.setChildNodeId("childNodeId");
+ builder.setParentNodeId("parentNodeId");
+ final ProjectionMetadata originalMetadata = builder.build();
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
+
+ // Read it from the Fluo table.
+ ProjectionMetadata storedMetdata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetdata = dao.readProjectionMetadata(sx, "nodeId");
}
// Ensure the deserialized object is the same as the serialized one.
@@ -180,8 +216,9 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Create the object that will be serialized.
final ConstructQueryMetadata.Builder builder = ConstructQueryMetadata.builder();
builder.setNodeId("nodeId");
- builder.setSparql(query);
builder.setChildNodeId("childNodeId");
+ builder.setParentNodeId("parentNodeId");
+ builder.setVarOrder(new VariableOrder("a;b;c"));
builder.setConstructGraph(new ConstructGraph(patterns));
final ConstructQueryMetadata originalMetadata = builder.build();
@@ -209,7 +246,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Create the object that will be serialized.
final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
- .setVariableOrder(new VariableOrder("totalCount"))
+ .setVarOrder(new VariableOrder("totalCount"))
.setParentNodeId("parentNodeId")
.setChildNodeId("childNodeId")
.setGroupByVariableOrder(new VariableOrder("a", "b", "c"))
@@ -240,7 +277,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Create the object that will be serialized.
final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId")
- .setVariableOrder(new VariableOrder("totalCount"))
+ .setVarOrder(new VariableOrder("totalCount"))
.setParentNodeId("parentNodeId")
.setChildNodeId("childNodeId")
.addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount"))
@@ -315,8 +352,10 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
"?worker <http://worksAt> <http://Chipotle>. " +
"}";
- final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
- final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ builder.setSparql(sparql);
+ builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+ final FluoQuery originalQuery = builder.build();
assertEquals(QueryType.Projection, originalQuery.getQueryType());
assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent());
@@ -331,7 +370,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Read it from the Fluo table.
FluoQuery storedQuery = null;
try(Snapshot sx = fluoClient.newSnapshot()) {
- storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().get().getNodeId());
+ storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
}
// Ensure the deserialized object is the same as the serialized one.
@@ -354,11 +393,102 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
"?worker <http://worksAt> <http://Chipotle>. " +
"}";
- final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
- final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds());
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ builder.setSparql(sparql);
+ builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+ final FluoQuery originalQuery = builder.build();
+
+ assertEquals(QueryType.Construct, originalQuery.getQueryType());
+ assertEquals(true, originalQuery.getConstructQueryMetadata().isPresent());
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalQuery);
+ tx.commit();
+ }
+
+ // Read it from the Fluo table.
+ FluoQuery storedQuery = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+ }
+
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalQuery, storedQuery);
+ }
+ }
+
+
+ @Test
+ public void fluoNestedQueryTest() throws MalformedQueryException {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ // Create the object that will be serialized.
+ final String sparql =
+ "SELECT ?id ?type ?location ?averagePrice ?vendor {" +
+ "FILTER(?averagePrice > 4) " +
+ "?type <urn:purchasedFrom> ?vendor ." +
+ "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+ "?id <urn:type> ?type . " +
+ "?id <urn:location> ?location ." +
+ "?id <urn:price> ?price ." +
+ "} " +
+ "GROUP BY ?type ?location }}";
+
+
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ builder.setSparql(sparql);
+ builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+ final FluoQuery originalQuery = builder.build();
+
+ assertEquals(QueryType.Projection, originalQuery.getQueryType());
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalQuery);
+ tx.commit();
+ }
+
+ // Read it from the Fluo table.
+ FluoQuery storedQuery = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
+ }
+
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalQuery, storedQuery);
+ }
+ }
+
+ @Test
+ public void fluoNestedConstructQueryTest() throws MalformedQueryException {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ // Create the object that will be serialized.
+ final String sparql = "CONSTRUCT { "
+ + "_:b a <urn:highSpeedTrafficArea> . "
+ + "_:b <urn:hasCount> ?obsCount . "
+ + "_:b <urn:hasLocation> ?location ."
+ + "_:b <urn:hasAverageVelocity> ?avgVelocity ."
+ + "} WHERE { "
+ + "FILTER(?obsCount > 1) "
+ + "{ "
+ + "SELECT ?location (count(?obs) AS ?obsCount) (avg(?velocity) AS ?avgVelocity) "
+ + "WHERE { "
+ + "FILTER(?velocity > 75) "
+ + "?obs <urn:hasVelocity> ?velocity. "
+ + "?obs <urn:hasLocation> ?location. "
+ + "}GROUP BY ?location }}";
+
+
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ builder.setSparql(sparql);
+ builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+ final FluoQuery originalQuery = builder.build();
assertEquals(QueryType.Construct, originalQuery.getQueryType());
- assertEquals(false, originalQuery.getQueryMetadata().isPresent());
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Write it to the Fluo table.
@@ -370,11 +500,12 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Read it from the Fluo table.
FluoQuery storedQuery = null;
try(Snapshot sx = fluoClient.newSnapshot()) {
- storedQuery = dao.readFluoQuery(sx, originalQuery.getConstructQueryMetadata().get().getNodeId());
+ storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId());
}
// Ensure the deserialized object is the same as the serialized one.
assertEquals(originalQuery, storedQuery);
}
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
index 0cd7cfb..1707308 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
@@ -41,7 +41,7 @@ import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
@@ -91,7 +91,7 @@ public class BatchDeleteIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+ String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
List<String> ids = getNodeIdStrings(fluoClient, queryId);
List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
@@ -130,7 +130,7 @@ public class BatchDeleteIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+ String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(1);
@@ -176,7 +176,7 @@ public class BatchDeleteIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+ String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(1);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 0f2d892..7c4caa4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -35,7 +35,7 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Span;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import org.openrdf.model.Statement;
@@ -79,10 +79,10 @@ public class CreateDeleteIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(17, rows.size());
+ assertEquals(20, rows.size());
// Delete the PCJ from the Fluo application.
- new DeletePcj(1).deletePcj(fluoClient, pcjId);
+ new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
@@ -111,10 +111,10 @@ public class CreateDeleteIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(10, rows.size());
+ assertEquals(12, rows.size());
// Delete the PCJ from the Fluo application.
- new DeletePcj(1).deletePcj(fluoClient, pcjId);
+ new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index f330825..d623043 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -29,7 +29,7 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
@@ -102,7 +102,7 @@ public class InputIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
// Verify the end results of the query match the expected results.
super.getMiniFluo().waitForObservers();
@@ -162,7 +162,7 @@ public class InputIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
// Ensure the query has no results yet.
super.getMiniFluo().waitForObservers();
@@ -228,7 +228,7 @@ public class InputIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
// Ensure Alice is a match.
super.getMiniFluo().waitForObservers();
@@ -317,7 +317,7 @@ public class InputIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Tell the Fluo app to maintain the PCJ.
- new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
+ new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName());
// Ensure Alice is a match.
super.getMiniFluo().waitForObservers();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index ab7610d..3ee07a7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -29,6 +29,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.FluoITHelper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -244,6 +247,10 @@ public class KafkaExportIT extends KafkaExportITBase {
// Create the PCJ in Fluo and load the statements into Rya.
final String pcjId = loadData(sparql, statements);
+
+ try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
+ FluoITHelper.printFluoTable(fluo);
+ }
// Create the expected results of the SPARQL query once the PCJ has been computed.
final MapBindingSet expectedResult = new MapBindingSet();
@@ -350,7 +357,7 @@ public class KafkaExportIT extends KafkaExportITBase {
}
@Test
- public void groupByManyBindings_avaerages() throws Exception {
+ public void groupByManyBindings_averages() throws Exception {
// A query that groups what is aggregated by two of the keys.
final String sparql =
"SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
@@ -425,6 +432,160 @@ public class KafkaExportIT extends KafkaExportITBase {
assertEquals(expectedResults, results);
}
+
+ @Test
+ public void nestedGroupByManyBindings_averages() throws Exception {
+ // A query that groups what is aggregated by two of the keys.
+ final String sparql =
+ "SELECT ?type ?location ?averagePrice {" +
+ "FILTER(?averagePrice > 4) " +
+ "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+ "?id <urn:type> ?type . " +
+ "?id <urn:location> ?location ." +
+ "?id <urn:price> ?price ." +
+ "} " +
+ "GROUP BY ?type ?location }}";
+
+ // Create the Statements that will be loaded into Rya.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Collection<Statement> statements = Sets.newHashSet(
+ // American items that will be averaged.
+ vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createLiteral("apple")),
+ vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+ vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(2.50)),
+
+ vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createLiteral("cheese")),
+ vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+ vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(4.25)),
+
+ vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createLiteral("cheese")),
+ vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+ vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)),
+
+ // French items that will be averaged.
+ vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createLiteral("cheese")),
+ vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")),
+ vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(8.5)),
+
+ vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")),
+ vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("France")),
+ vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)),
+
+ vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")),
+ vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("France")),
+ vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+ // Create the PCJ in Fluo and load the statements into Rya.
+ final String pcjId = loadData(sparql, statements);
+
+ // Create the expected results of the SPARQL query once the PCJ has been computed.
+ final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING));
+ bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+ bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL));
+ expectedResults.add( new VisibilityBindingSet(bs));
+
+ bs = new MapBindingSet();
+ bs.addBinding("type", vf.createLiteral("cigarettes", XMLSchema.STRING));
+ bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+ bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL));
+ expectedResults.add( new VisibilityBindingSet(bs) );
+
+ bs = new MapBindingSet();
+ bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING));
+ bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING));
+ bs.addBinding("averagePrice", vf.createLiteral("4.75", XMLSchema.DECIMAL));
+ expectedResults.add( new VisibilityBindingSet(bs) );
+
+ // Verify the end results of the query match the expected results.
+ final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
+ System.out.println(results);
+ assertEquals(expectedResults, results);
+ }
+
+
+ @Test
+ public void nestedWithJoinGroupByManyBindings_averages() throws Exception {
+
+ // A query that groups what is aggregated by two of the keys.
+ final String sparql =
+ "SELECT ?type ?location ?averagePrice ?milkType {" +
+ "FILTER(?averagePrice > 4) " +
+ "?type <urn:hasMilkType> ?milkType ." +
+ "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" +
+ "?id <urn:type> ?type . " +
+ "?id <urn:location> ?location ." +
+ "?id <urn:price> ?price ." +
+ "} " +
+ "GROUP BY ?type ?location }}";
+
+ // Create the Statements that will be loaded into Rya.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Collection<Statement> statements = Sets.newHashSet(
+
+ vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createURI("urn:blue")),
+ vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("France")),
+ vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(8.5)),
+ vf.createStatement(vf.createURI("urn:blue"), vf.createURI("urn:hasMilkType"), vf.createLiteral("cow", XMLSchema.STRING)),
+
+ vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createURI("urn:american")),
+ vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+ vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(.99)),
+
+ vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createURI("urn:cheddar")),
+ vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")),
+ vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)),
+
+ // French items that will be averaged.
+ vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createURI("urn:goat")),
+ vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")),
+ vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(6.5)),
+ vf.createStatement(vf.createURI("urn:goat"), vf.createURI("urn:hasMilkType"), vf.createLiteral("goat", XMLSchema.STRING)),
+
+ vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createURI("urn:fontina")),
+ vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("Italy")),
+ vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)),
+ vf.createStatement(vf.createURI("urn:fontina"), vf.createURI("urn:hasMilkType"), vf.createLiteral("cow", XMLSchema.STRING)),
+
+ vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createURI("urn:fontina")),
+ vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("Italy")),
+ vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99)));
+
+ // Create the PCJ in Fluo and load the statements into Rya.
+ final String pcjId = loadData(sparql, statements);
+
+ // Create the expected results of the SPARQL query once the PCJ has been computed.
+ final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("type", vf.createURI("urn:blue"));
+ bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+ bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL));
+ bs.addBinding("milkType", vf.createLiteral("cow", XMLSchema.STRING));
+ expectedResults.add( new VisibilityBindingSet(bs));
+
+ bs = new MapBindingSet();
+ bs.addBinding("type", vf.createURI("urn:goat"));
+ bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING));
+ bs.addBinding("averagePrice", vf.createLiteral("6.5", XMLSchema.DECIMAL));
+ bs.addBinding("milkType", vf.createLiteral("goat", XMLSchema.STRING));
+ expectedResults.add( new VisibilityBindingSet(bs) );
+
+ bs = new MapBindingSet();
+ bs.addBinding("type", vf.createURI("urn:fontina"));
+ bs.addBinding("location", vf.createLiteral("Italy", XMLSchema.STRING));
+ bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL));
+ bs.addBinding("milkType", vf.createLiteral("cow", XMLSchema.STRING));
+ expectedResults.add( new VisibilityBindingSet(bs) );
+
+ // Verify the end results of the query match the expected results.
+ final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
+ System.out.println(results);
+ assertEquals(expectedResults, results);
+ }
+
private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
requireNonNull(pcjId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
index 7a4ed8d..ca8de0d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -43,24 +43,28 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.junit.Assert;
import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.XMLSchema;
import com.google.common.collect.Sets;
@@ -83,6 +87,7 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+ observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
// Configure the export observer to export new PCJ results to the mini
// accumulo cluster.
@@ -285,6 +290,77 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results);
}
+
+ @Test
+ public void nestedConstructQuery() throws Exception {
+ // A query that groups what is aggregated by one of the keys.
+ final String sparql = "CONSTRUCT { "
+ + "_:b a <urn:highSpeedTrafficArea> . "
+ + "_:b <urn:hasCount> ?obsCount . "
+ + "_:b <urn:hasLocation> ?location ."
+ + "_:b <urn:hasAverageVelocity> ?avgVelocity ."
+ + "} WHERE { "
+ + "FILTER(?obsCount > 1) "
+ + "{ "
+ + "SELECT ?location (count(?obs) AS ?obsCount) (avg(?velocity) AS ?avgVelocity) "
+ + "WHERE { "
+ + "FILTER(?velocity > 75) "
+ + "?obs <urn:hasVelocity> ?velocity. "
+ + "?obs <urn:hasLocation> ?location. "
+ + "}GROUP BY ?location }}";
+
+ // Create the Statements that will be loaded into Rya.
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Collection<Statement> statements = Sets.newHashSet(
+ vf.createStatement(vf.createURI("urn:obs1"), vf.createURI("urn:hasVelocity"), vf.createLiteral(77)),
+ vf.createStatement(vf.createURI("urn:obs1"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")),
+ vf.createStatement(vf.createURI("urn:obs2"), vf.createURI("urn:hasVelocity"), vf.createLiteral(81)),
+ vf.createStatement(vf.createURI("urn:obs2"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")),
+ vf.createStatement(vf.createURI("urn:obs3"), vf.createURI("urn:hasVelocity"), vf.createLiteral(70)),
+ vf.createStatement(vf.createURI("urn:obs3"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")),
+ vf.createStatement(vf.createURI("urn:obs5"), vf.createURI("urn:hasVelocity"), vf.createLiteral(87)),
+ vf.createStatement(vf.createURI("urn:obs5"), vf.createURI("urn:hasLocation"), vf.createLiteral("Rosslyn")),
+ vf.createStatement(vf.createURI("urn:obs6"), vf.createURI("urn:hasVelocity"), vf.createLiteral(81)),
+ vf.createStatement(vf.createURI("urn:obs6"), vf.createURI("urn:hasLocation"), vf.createLiteral("Rosslyn")),
+ vf.createStatement(vf.createURI("urn:obs7"), vf.createURI("urn:hasVelocity"), vf.createLiteral(67)),
+ vf.createStatement(vf.createURI("urn:obs7"), vf.createURI("urn:hasLocation"), vf.createLiteral("Clarendon")),
+ vf.createStatement(vf.createURI("urn:obs8"), vf.createURI("urn:hasVelocity"), vf.createLiteral(77)),
+ vf.createStatement(vf.createURI("urn:obs8"), vf.createURI("urn:hasLocation"), vf.createLiteral("Ballston")),
+ vf.createStatement(vf.createURI("urn:obs9"), vf.createURI("urn:hasVelocity"), vf.createLiteral(87)),
+ vf.createStatement(vf.createURI("urn:obs9"), vf.createURI("urn:hasLocation"), vf.createLiteral("FallsChurch")));
+
+ // Create the PCJ in Fluo and load the statements into Rya.
+ final String pcjId = loadStatements(sparql, statements);
+
+ // Verify the end results of the query match the expected results.
+ final Set<RyaSubGraph> results = readAllResults(pcjId);
+
+ RyaStatement statement1 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasCount"), new RyaType(XMLSchema.INTEGER, "2"));
+ RyaStatement statement2 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasAverageVelocity"), new RyaType(XMLSchema.DECIMAL, "84"));
+ RyaStatement statement3 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasLocation"), new RyaType("Rosslyn"));
+ RyaStatement statement4 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI(RDF.TYPE.toString()), new RyaURI("urn:highSpeedTrafficArea"));
+ RyaStatement statement5 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasCount"), new RyaType(XMLSchema.INTEGER, "2"));
+ RyaStatement statement6 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasAverageVelocity"), new RyaType(XMLSchema.DECIMAL, "79"));
+ RyaStatement statement7 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasLocation"), new RyaType("OldTown"));
+ RyaStatement statement8 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI(RDF.TYPE.toString()), new RyaURI("urn:highSpeedTrafficArea"));
+
+ final Set<RyaSubGraph> expectedResults = new HashSet<>();
+
+ RyaSubGraph subGraph1 = new RyaSubGraph(pcjId);
+ Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement1, statement2, statement3, statement4));
+ subGraph1.setStatements(stmnts1);
+ expectedResults.add(subGraph1);
+
+ RyaSubGraph subGraph2 = new RyaSubGraph(pcjId);
+ Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement5, statement6, statement7, statement8));
+ subGraph2.setStatements(stmnts2);
+ expectedResults.add(subGraph2);
+
+ Assert.assertEquals(expectedResults.size(), results.size());
+ ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results);;
+ }
+
+
protected KafkaConsumer<String, RyaSubGraph> makeRyaSubGraphConsumer(final String TopicName) {
// setup consumer
final Properties consumerProps = new Properties();
@@ -330,9 +406,9 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
FluoClient client = null;
try {
- CreatePcj createPcj = new CreatePcj();
+ CreateFluoPcj createPcj = new CreateFluoPcj();
client = new FluoClientImpl(super.getFluoConfiguration());
- FluoQuery fluoQuery = createPcj.createFluoPcj(client, sparql);
+ String id = createPcj.createPcj(sparql, client).getQueryId();
AccumuloRyaDAO dao = getRyaDAO();
dao.add(statements.iterator());
@@ -341,7 +417,7 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
super.getMiniFluo().waitForObservers();
// FluoITHelper.printFluoTable(client);
- return fluoQuery.getConstructQueryMetadata().get().getNodeId();
+ return id;
} finally {
if (client != null) {
client.close();