You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/06/22 18:05:14 UTC
[2/4] incubator-rya git commit: RYA-273-Construct Query Support.
Closes #161.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 77d6a49..3396114 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -46,6 +46,18 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* </table>
* </p>
* <p>
+ * <b>Construct Query Metadata</b>
+ * <table border="1" style="width:100%">
+ * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
+ * </table>
+ * </p>
+ * <p>
* <b>Filter Metadata</b>
* <table border="1" style="width:100%">
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
@@ -104,6 +116,7 @@ public class FluoQueryColumns {
public static final String JOIN_METADATA_CF = "joinMetadata";
public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata";
public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
+ public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
/**
* New triples that have been added to Rya are written as a row in this
@@ -151,6 +164,14 @@ public class FluoQueryColumns {
public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId");
public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
+ // Construct Query Metadata columns.
+ public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId");
+ public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder");
+ public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph");
+ public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId");
+ public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements");
+ public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql");
+
// Filter Metadata columns.
public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId");
public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder");
@@ -201,6 +222,18 @@ public class FluoQueryColumns {
QUERY_CHILD_NODE_ID)),
/**
+ * The columns a {@link ConstructQueryMetadata} object's fields are stored within.
+ */
+ CONSTRUCT_COLUMNS(
+ Arrays.asList(CONSTRUCT_NODE_ID,
+ CONSTRUCT_VARIABLE_ORDER,
+ CONSTRUCT_GRAPH,
+ CONSTRUCT_CHILD_NODE_ID,
+ CONSTRUCT_SPARQL,
+ CONSTRUCT_STATEMENTS)),
+
+
+ /**
* The columns a {@link FilterMetadata} object's fields are stored within.
*/
FILTER_COLUMNS(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index dfc3333..5e9d654 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -31,7 +31,8 @@ import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -39,6 +40,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -103,6 +105,59 @@ public class FluoQueryMetadataDAO {
}
/**
+ * Write an instance of {@link ConstructQueryMetadata} to the Fluo table.
+ *
+ * @param tx - The transaction that will be used to commit the metadata. (not null)
+ * @param metadata - The Construct Query node metadata that will be written to the table. (not null)
+ */
+ public void write(final TransactionBase tx, final ConstructQueryMetadata metadata) {
+ requireNonNull(tx);
+ requireNonNull(metadata);
+
+ final String rowId = metadata.getNodeId();
+ tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId);
+ tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() );
+ tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql());
+ tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph()));
+ }
+
+ /**
+ * Read an instance of {@link ConstructQueryMetadata} from the Fluo table.
+ *
+ * @param sx - The snapshot that will be used to read the metadata . (not null)
+ * @param nodeId - The nodeId of the Construct Query node that will be read. (not null)
+ * @return The {@link ConstructQueryMetadata} that was read from table.
+ */
+ public ConstructQueryMetadata readConstructQueryMetadata(final SnapshotBase sx, final String nodeId) {
+ return readConstructQueryMetadataBuilder(sx, nodeId).build();
+ }
+
+ private ConstructQueryMetadata.Builder readConstructQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+ requireNonNull(sx);
+ requireNonNull(nodeId);
+
+ // Fetch the values from the Fluo table.
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
+ FluoQueryColumns.CONSTRUCT_GRAPH,
+ FluoQueryColumns.CONSTRUCT_SPARQL,
+ FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+
+ final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH);
+ final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString);
+ final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+ final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL);
+
+ return ConstructQueryMetadata.builder()
+ .setNodeId(nodeId)
+ .setConstructGraph(graph)
+ .setSparql(sparql)
+ .setChildNodeId(childNodeId);
+ }
+
+
+ /**
* Write an instance of {@link FilterMetadata} to the Fluo table.
*
* @param tx - The transaction that will be used to commit the metadata. (not null)
@@ -376,13 +431,25 @@ public class FluoQueryMetadataDAO {
requireNonNull(tx);
requireNonNull(query);
- // Store the Query ID so that it may be looked up from the original SPARQL string.
- final String sparql = query.getQueryMetadata().getSparql();
- final String queryId = query.getQueryMetadata().getNodeId();
- tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
-
// Write the rest of the metadata objects.
- write(tx, query.getQueryMetadata());
+ switch(query.getQueryType()) {
+ case Construct:
+ ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
+ // Store the Query ID so that it may be looked up from the original SPARQL string.
+ final String constructSparql = constructMetadata.getSparql();
+ final String constructQueryId = constructMetadata.getNodeId();
+ tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId));
+ write(tx, constructMetadata);
+ break;
+ case Projection:
+ QueryMetadata queryMetadata = query.getQueryMetadata().get();
+ // Store the Query ID so that it may be looked up from the original SPARQL string.
+ final String sparql = queryMetadata.getSparql();
+ final String queryId = queryMetadata.getNodeId();
+ tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
+ write(tx, queryMetadata);
+ break;
+ }
for(final FilterMetadata filter : query.getFilterMetadata()) {
write(tx, filter);
@@ -423,50 +490,62 @@ public class FluoQueryMetadataDAO {
requireNonNull(childNodeId);
final NodeType childType = NodeType.fromNodeId(childNodeId).get();
- switch(childType) {
- case QUERY:
- // Add this node's metadata.
- final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
- builder.setQueryMetadata(queryBuilder);
-
- // Add it's child's metadata.
- addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
- break;
-
- case JOIN:
- // Add this node's metadata.
- final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId);
- builder.addJoinMetadata(joinBuilder);
-
- // Add it's children's metadata.
- final JoinMetadata joinMetadata = joinBuilder.build();
- addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId());
- addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId());
- break;
-
- case FILTER:
- // Add this node's metadata.
- final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId);
- builder.addFilterMetadata(filterBuilder);
-
- // Add it's child's metadata.
- addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId());
- break;
-
- case STATEMENT_PATTERN:
- // Add this node's metadata.
- final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
- builder.addStatementPatternBuilder(spBuilder);
- break;
-
- case AGGREGATION:
- // Add this node's metadata.
- final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
- builder.addAggregateMetadata(aggregationBuilder);
-
- // Add it's child's metadata.
- addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
- break;
+ switch (childType) {
+ case QUERY:
+ // Add this node's metadata.
+ final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
+ Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
+ builder.setQueryMetadata(queryBuilder);
+
+ // Add it's child's metadata.
+ addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
+ break;
+
+ case CONSTRUCT:
+ final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId);
+ Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
+ builder.setConstructQueryMetadata(constructBuilder);
+
+ // Add it's child's metadata.
+ addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId());
+ break;
+
+ case AGGREGATION:
+ // Add this node's metadata.
+ final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
+ builder.addAggregateMetadata(aggregationBuilder);
+
+ // Add it's child's metadata.
+ addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
+ break;
+
+ case JOIN:
+ // Add this node's metadata.
+ final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId);
+ builder.addJoinMetadata(joinBuilder);
+
+ // Add it's children's metadata.
+ final JoinMetadata joinMetadata = joinBuilder.build();
+ addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId());
+ addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId());
+ break;
+
+ case FILTER:
+ // Add this node's metadata.
+ final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId);
+ builder.addFilterMetadata(filterBuilder);
+
+ // Add it's child's metadata.
+ addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId());
+ break;
+
+ case STATEMENT_PATTERN:
+ // Add this node's metadata.
+ final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
+ builder.addStatementPatternBuilder(spBuilder);
+ break;
+ default:
+ break;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index 064cfe8..23ac286 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -129,7 +129,7 @@ public class QueryMetadata extends CommonNodeMetadata {
@DefaultAnnotation(NonNull.class)
public static final class Builder {
- private final String nodeId;
+ private String nodeId;
private VariableOrder varOrder;
private String sparql;
private String childNodeId;
@@ -143,6 +143,7 @@ public class QueryMetadata extends CommonNodeMetadata {
this.nodeId = checkNotNull(nodeId);
}
+
/**
* Set the variable order of binding sets that are emitted by this node.
*
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 562470a..631ce60 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,7 +19,9 @@
package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -28,6 +30,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -35,6 +38,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
@@ -42,21 +47,33 @@ import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.Aggregatio
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.BNodeImpl;
import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.BNodeGenerator;
import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
import org.openrdf.query.algebra.Filter;
import org.openrdf.query.algebra.Group;
import org.openrdf.query.algebra.GroupElem;
import org.openrdf.query.algebra.Join;
import org.openrdf.query.algebra.LeftJoin;
+import org.openrdf.query.algebra.MultiProjection;
import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import org.openrdf.query.parser.ParsedQuery;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -158,16 +175,18 @@ public class SparqlFluoQueryBuilder {
// Create the prefix of the id. This makes it a little bit more human readable.
String prefix;
- if(node instanceof StatementPattern) {
+ if (node instanceof StatementPattern) {
prefix = SP_PREFIX;
- } else if(node instanceof Filter) {
+ } else if (node instanceof Filter) {
prefix = FILTER_PREFIX;
- } else if(node instanceof Join || node instanceof LeftJoin) {
+ } else if (node instanceof Join || node instanceof LeftJoin) {
prefix = JOIN_PREFIX;
- } else if(node instanceof Projection) {
+ } else if (node instanceof Projection) {
prefix = QUERY_PREFIX;
} else if(node instanceof Extension) {
prefix = AGGREGATION_PREFIX;
+ } else if (node instanceof Reduced) {
+ prefix = CONSTRUCT_PREFIX;
} else {
throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
}
@@ -402,7 +421,7 @@ public class SparqlFluoQueryBuilder {
final QueryModelNode child = node.getArg();
if(child == null) {
- throw new IllegalArgumentException("Filter arg connot be null.");
+ throw new IllegalArgumentException("Projection arg connot be null.");
}
final String childNodeId = nodeIds.getOrMakeId(child);
@@ -417,6 +436,60 @@ public class SparqlFluoQueryBuilder {
// Walk to the next node.
super.meet(node);
}
+
+
+ public void meet(Reduced node) {
+ //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata
+ //builder with FluoQueryBuilder, and add metadata that we currently have
+ final String constructId = nodeIds.getOrMakeId(node);
+ final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder();
+ constructBuilder.setNodeId(constructId);
+ fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+ constructBuilder.setSparql(sparql);
+
+ //get child node
+ QueryModelNode child = node.getArg();
+ Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
+ UnaryTupleOperator unary = (UnaryTupleOperator) child;
+
+ //get ProjectionElemList to build ConstructGraph
+ final List<ProjectionElemList> projections = new ArrayList<>();
+ if(unary instanceof Projection) {
+ projections.add(((Projection) unary).getProjectionElemList());
+ } else {
+ projections.addAll(((MultiProjection)unary).getProjections());
+ }
+
+ //get ExtensionElems to build ConstructGraph
+ QueryModelNode grandChild = unary.getArg();
+ Preconditions.checkArgument(grandChild instanceof Extension);
+ Extension extension = (Extension) grandChild;
+ final List<ExtensionElem> extensionElems = extension.getElements();
+ final ConstructGraph graph = getConstructGraph(projections, extensionElems);
+ constructBuilder.setConstructGraph(graph);
+
+ //set child to the next node we care about in Fluo
+ //if Extension's arg is a Group node, then it is an Aggregation, so set child to Extension
+ //otherwise set child to Extension's child (only care about Extensions if they are Aggregations)
+ if(extension.getArg() instanceof Group) {
+ child = extension;
+ } else {
+ child = extension.getArg();
+ }
+
+ //Set the child node in the ConstructQueryMetadataBuilder
+ String childNodeId = nodeIds.getOrMakeId(child);
+ constructBuilder.setChildNodeId(childNodeId);
+
+ // Update the child node's metadata.
+ final Set<String> childVars = getVars((TupleExpr)child);
+ final VariableOrder childVarOrder = new VariableOrder(childVars);
+ setChildMetadata(childNodeId, childVarOrder, constructId);
+
+ //fast forward visitor to next node we care about
+ child.visit(this);
+ }
+
/**
* Update a query node's metadata to include it's binding set variable order
@@ -433,57 +506,102 @@ public class SparqlFluoQueryBuilder {
checkNotNull(parentNodeId);
final NodeType childType = NodeType.fromNodeId(childNodeId).get();
- switch(childType) {
- case STATEMENT_PATTERN:
- StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
- if(spBuilder == null) {
- spBuilder = StatementPatternMetadata.builder(childNodeId);
- fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
- }
+ switch (childType) {
+ case STATEMENT_PATTERN:
+ StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
+ if (spBuilder == null) {
+ spBuilder = StatementPatternMetadata.builder(childNodeId);
+ fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+ }
- spBuilder.setVarOrder(childVarOrder);
- spBuilder.setParentNodeId(parentNodeId);
- break;
+ spBuilder.setVarOrder(childVarOrder);
+ spBuilder.setParentNodeId(parentNodeId);
+ break;
- case JOIN:
- JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
- if(joinBuilder == null) {
- joinBuilder = JoinMetadata.builder(childNodeId);
- fluoQueryBuilder.addJoinMetadata(joinBuilder);
- }
+ case JOIN:
+ JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
+ if (joinBuilder == null) {
+ joinBuilder = JoinMetadata.builder(childNodeId);
+ fluoQueryBuilder.addJoinMetadata(joinBuilder);
+ }
- joinBuilder.setVariableOrder(childVarOrder);
- joinBuilder.setParentNodeId(parentNodeId);
- break;
+ joinBuilder.setVariableOrder(childVarOrder);
+ joinBuilder.setParentNodeId(parentNodeId);
+ break;
- case FILTER:
- FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
- if(filterBuilder == null) {
- filterBuilder = FilterMetadata.builder(childNodeId);
- fluoQueryBuilder.addFilterMetadata(filterBuilder);
- }
+ case FILTER:
+ FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
+ if (filterBuilder == null) {
+ filterBuilder = FilterMetadata.builder(childNodeId);
+ fluoQueryBuilder.addFilterMetadata(filterBuilder);
+ }
- filterBuilder.setVarOrder(childVarOrder);
- filterBuilder.setParentNodeId(parentNodeId);
- break;
+ filterBuilder.setVarOrder(childVarOrder);
+ filterBuilder.setParentNodeId(parentNodeId);
+ break;
- case AGGREGATION:
- AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
- if(aggregationBuilder == null) {
- aggregationBuilder = AggregationMetadata.builder(childNodeId);
- fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
- }
+ case AGGREGATION:
+ AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+ if (aggregationBuilder == null) {
+ aggregationBuilder = AggregationMetadata.builder(childNodeId);
+ fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+ }
- aggregationBuilder.setVariableOrder(childVarOrder);
- aggregationBuilder.setParentNodeId(parentNodeId);
- break;
+ aggregationBuilder.setVariableOrder(childVarOrder);
+ aggregationBuilder.setParentNodeId(parentNodeId);
+ break;
- case QUERY:
- throw new IllegalArgumentException("QUERY nodes do not have children.");
- default:
- throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+ case QUERY:
+ throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
+ case CONSTRUCT:
+ throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node.");
+ default:
+ throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+ }
+ }
+
+ private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) {
+ Map<String, Value> valueMap = new HashMap<>();
+ //create valueMap to associate source names with Values
+ for(ExtensionElem elem: extensionElems) {
+ String name = elem.getName();
+ ValueExpr expr = elem.getExpr();
+ if(expr instanceof ValueConstant) {
+ Value value = ((ValueConstant) expr).getValue();
+ valueMap.put(name, value);
+ } else if(expr instanceof BNodeGenerator) {
+ valueMap.put(name, new BNodeImpl(UUID.randomUUID().toString()));
+ }
+ }
+
+ Set<ConstructProjection> constructProj = new HashSet<>();
+ //build ConstructProjection for each ProjectionElemList
+ for(ProjectionElemList list: projections) {
+ validateProjectionElemList(list);
+ List<Var> vars = new ArrayList<>();
+ for(ProjectionElem elem: list.getElements()) {
+ String sourceName = elem.getSourceName();
+ Var var = new Var(sourceName);
+ if(valueMap.containsKey(sourceName)) {
+ var.setValue(valueMap.get(sourceName));
+ }
+ vars.add(var);
+ }
+ constructProj.add(new ConstructProjection(vars.get(0), vars.get(1), vars.get(2)));
}
+
+ return new ConstructGraph(constructProj);
+ }
+
+ private void validateProjectionElemList(ProjectionElemList list) {
+ List<ProjectionElem> elements = list.getElements();
+ checkArgument(elements.size() == 3);
+ checkArgument(elements.get(0).getTargetName().equals("subject"));
+ checkArgument(elements.get(1).getTargetName().equals("predicate"));
+ checkArgument(elements.get(2).getTargetName().equals("object"));
}
+
+
/**
* Get the non-constant variables from a {@link TupleExpr}.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
new file mode 100644
index 0000000..94c8571
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java
@@ -0,0 +1,145 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.collect.Sets;
+
+public class ConstructGraphTest {
+
+ private ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void testConstructGraph() throws MalformedQueryException, UnsupportedEncodingException {
+ String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructGraph graph = new ConstructGraph(patterns);
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("x", vf.createURI("uri:Joe"));
+ bs.addBinding("y", vf.createURI("uri:Bob"));
+ bs.addBinding("z", vf.createURI("uri:BurgerShack"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs,"FOUO");
+ Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs);
+
+ RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+ RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Bob"), new RyaURI("uri:worksAt"), new RyaURI("uri:BurgerShack"));
+ Set<RyaStatement> expected = Sets.newHashSet(Arrays.asList(statement1, statement2));
+ expected.forEach(x-> x.setColumnVisibility("FOUO".getBytes()));
+ ConstructGraphTestUtils.ryaStatementSetsEqualIgnoresTimestamp(expected, statements);
+ }
+
+ @Test
+ public void testConstructGraphBNode() throws MalformedQueryException {
+ String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?z }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructGraph graph = new ConstructGraph(patterns);
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("x", vf.createURI("uri:Joe"));
+ bs.addBinding("z", vf.createURI("uri:BurgerShack"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO");
+ Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs);
+ Set<RyaStatement> statements2 = graph.createGraphFromBindingSet(vBs);
+
+ RyaURI subject = null;
+ for(RyaStatement statement: statements) {
+ RyaURI subjURI = statement.getSubject();
+ if(subject == null) {
+ subject = subjURI;
+ } else {
+ assertEquals(subjURI, subject);
+ }
+ }
+ RyaURI subject2 = null;
+ for(RyaStatement statement: statements2) {
+ RyaURI subjURI = statement.getSubject();
+ if(subject2 == null) {
+ subject2 = subjURI;
+ } else {
+ assertEquals(subjURI, subject2);
+ }
+ }
+
+ assertTrue(!subject.equals(subject2));
+
+ ConstructGraphTestUtils.ryaStatementsEqualIgnoresBlankNode(statements, statements2);
+ }
+
+
+ @Test
+ public void testConstructGraphSerializer() throws MalformedQueryException {
+
+ String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructGraph graph = new ConstructGraph(patterns);
+
+ String constructString = ConstructGraphSerializer.toConstructString(graph);
+ ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString);
+
+ assertEquals(graph, deserialized);
+
+ }
+
+ @Test
+ public void testConstructGraphSerializerBlankNode() throws MalformedQueryException {
+
+ String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?y }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructGraph graph = new ConstructGraph(patterns);
+
+ String constructString = ConstructGraphSerializer.toConstructString(graph);
+ ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString);
+
+ assertEquals(graph, deserialized);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
new file mode 100644
index 0000000..a12b6de
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java
@@ -0,0 +1,126 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.junit.Assert;
+import org.openrdf.model.Statement;
+
+import com.google.common.base.Objects;
+
+public class ConstructGraphTestUtils {
+
+ public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+ Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2));
+ }
+
+ public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+ Set<VisibilityStatementSet> set1 = new HashSet<>();
+ Set<VisibilityStatementSet> set2 = new HashSet<>();
+ subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements())));
+ subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements())));
+ Assert.assertEquals(set1, set2);
+ }
+
+ public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+ Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
+ subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
+ subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+ }
+
+ private static int getKey(RyaSubGraph subgraph) {
+ int key = 0;
+ for(RyaStatement statement: subgraph.getStatements()) {
+ key += statement.getObject().hashCode();
+ }
+ return key;
+ }
+
+ public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+ Map<String, RyaURI> bNodeMap = new HashMap<>();
+ statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject()));
+ statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData())));
+ ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2);
+ }
+
+
+ /**
+ * Class used for comparing Sets of RyaStatements while ignoring timestamps.
+ * It is assumed that all RyaStatements in the Set used to construct this class
+ * have the same visibility.
+ */
+ public static class VisibilityStatementSet {
+
+ private Set<Statement> statements;
+ private String visibility;
+
+ public VisibilityStatementSet(Set<RyaStatement> statements) {
+ this.statements = new HashSet<>();
+ statements.forEach(x -> {
+ this.statements.add(RyaToRdfConversions.convertStatement(x));
+ if (visibility == null) {
+ if (x.getColumnVisibility() != null) {
+ visibility = new String(x.getColumnVisibility());
+ } else {
+ this.visibility = "";
+ }
+ }
+ });
+ }
+
+ public VisibilityStatementSet(RyaSubGraph subgraph) {
+ this(subgraph.getStatements());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(this == o) {
+ return true;
+ }
+
+ if(o instanceof VisibilityStatementSet) {
+ VisibilityStatementSet that = (VisibilityStatementSet) o;
+ return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(visibility, statements);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ return builder.append("Visiblity Statement Set \n").append(" Statements: " + statements + "\n")
+ .append(" Visibilities: " + visibility + " \n").toString();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
new file mode 100644
index 0000000..080031e
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java
@@ -0,0 +1,112 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.BNode;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+public class ConstructProjectionTest {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void testConstructProjectionProjectSubj() throws MalformedQueryException, UnsupportedEncodingException {
+ String query = "select ?x where { ?x <uri:talksTo> <uri:Bob> }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructProjection projection = new ConstructProjection(patterns.get(0));
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("x", vf.createURI("uri:Joe"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO");
+ RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>());
+
+ RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+ expected.setColumnVisibility("FOUO".getBytes("UTF-8"));
+ expected.setTimestamp(statement.getTimestamp());
+
+ assertEquals(expected, statement);
+ }
+
+ @Test
+ public void testConstructProjectionProjPred() throws MalformedQueryException {
+ String query = "select ?p where { <uri:Joe> ?p <uri:Bob> }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructProjection projection = new ConstructProjection(patterns.get(0));
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("p", vf.createURI("uri:worksWith"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>());
+
+ RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:worksWith"), new RyaURI("uri:Bob"));
+ expected.setTimestamp(statement.getTimestamp());
+ expected.setColumnVisibility(new byte[0]);
+
+ assertEquals(expected, statement);
+ }
+
+ @Test
+ public void testConstructProjectionBNodes() throws MalformedQueryException {
+ String query = "select ?o where { _:b <uri:talksTo> ?o }";
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr());
+ ConstructProjection projection = new ConstructProjection(patterns.get(0));
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("o", vf.createURI("uri:Bob"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ BNode bNode = vf.createBNode();
+ Map<String, BNode> bNodeMap = new HashMap<>();
+ bNodeMap.put("-anon-1", bNode);
+ RyaStatement statement = projection.projectBindingSet(vBs,bNodeMap);
+
+ RyaStatement expected = new RyaStatement(RdfToRyaConversions.convertResource(bNode), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob"));
+ expected.setTimestamp(statement.getTimestamp());
+ expected.setColumnVisibility(new byte[0]);
+
+ assertEquals(expected, statement);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
index 4ad5189..60e1bc1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java
@@ -38,10 +38,8 @@ public class FluoStringConverterTest {
// Setup a StatementPattern that represents "?x <http://worksAt> <http://Chipotle>."
final Var subject = new Var("x");
final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt"));
- predicate.setAnonymous(true);
predicate.setConstant(true);
final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
- object.setAnonymous(true);
object.setConstant(true);
final StatementPattern pattern = new StatementPattern(subject, predicate, object);
@@ -69,10 +67,8 @@ public class FluoStringConverterTest {
// Enusre it converted to the expected result.
final Var subject = new Var("x");
final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt"));
- predicate.setAnonymous(true);
predicate.setConstant(true);
final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
- object.setAnonymous(true);
object.setConstant(true);
final StatementPattern expected = new StatementPattern(subject, predicate, object);
@@ -89,7 +85,6 @@ public class FluoStringConverterTest {
// Ensure it converted to the expected result.
final Var expected = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle"));
- expected.setAnonymous(true);
expected.setConstant(true);
assertEquals(expected, var);
@@ -105,7 +100,6 @@ public class FluoStringConverterTest {
// Ensure it converted to the expected result.
final Var expected = new Var("-const-5", new LiteralImpl("5", XMLSchema.INTEGER));
- expected.setAnonymous(true);
expected.setConstant(true);
assertEquals(expected, result);
@@ -121,7 +115,6 @@ public class FluoStringConverterTest {
// Ensure it converted to the expected result.
final Var expected = new Var("-const-Chipotle", new LiteralImpl("Chipotle", XMLSchema.STRING));
- expected.setAnonymous(true);
expected.setConstant(true);
assertEquals(expected, result);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
new file mode 100644
index 0000000..8b9feaf
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java
@@ -0,0 +1,57 @@
+package org.apache.rya.indexing.pcj.fluo.app;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.junit.Test;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+public class RyaSubGraphKafkaSerDeTest {
+
+ private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+
+ @Test
+ public void serializationTestWithURI() {
+ RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString());
+ bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaURI("uri:345")));
+ bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaURI("uri:789")));
+ byte[] bundleBytes = serializer.toBytes(bundle);
+ RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes);
+ assertEquals(bundle, deserializedBundle);
+ }
+
+
+ @Test
+ public void serializationTestWithLiteral() {
+ RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString());
+ bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaType(XMLSchema.INTEGER, "345")));
+ bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaType(XMLSchema.INTEGER, "789")));
+ byte[] bundleBytes = serializer.toBytes(bundle);
+ RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes);
+ assertEquals(bundle, deserializedBundle);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
index 74193cf..b9c10d4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -28,7 +28,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
import org.junit.Test;
/**
@@ -93,7 +93,7 @@ public class KafkaExportParametersTest {
@Test
public void testKafkaResultExporterFactory() {
- KafkaResultExporterFactory factory = new KafkaResultExporterFactory();
+ KafkaBindingSetExporterFactory factory = new KafkaBindingSetExporterFactory();
assertNotNull(factory);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index e1c386d..99ccc58 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -55,13 +56,26 @@ public class QueryReportRenderer {
final FluoQuery metadata = queryReport.getFluoQuery();
- final QueryMetadata queryMetadata = metadata.getQueryMetadata();
- builder.appendItem( new ReportItem("QUERY NODE") );
- builder.appendItem( new ReportItem("Node ID", queryMetadata.getNodeId()) );
- builder.appendItem( new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()) );
- builder.appendItem( new ReportItem("SPARQL", prettyFormatSparql( queryMetadata.getSparql()) ) );
- builder.appendItem( new ReportItem("Child Node ID", queryMetadata.getChildNodeId()) );
- builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())) );
+ switch (metadata.getQueryType()) {
+ case Projection:
+ final QueryMetadata queryMetadata = metadata.getQueryMetadata().get();
+ builder.appendItem(new ReportItem("QUERY NODE"));
+ builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId()));
+ builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()));
+ builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql())));
+ builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId()));
+ builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())));
+ break;
+ case Construct:
+ final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();
+ builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE"));
+ builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId()));
+ builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString()));
+ builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql())));
+ builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId()));
+ builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString()));
+ builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId())));
+ }
for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) {
builder.appendItem( new ReportItem("") );
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 9263362..85edb11 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,100 +1,98 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.pcj.fluo.parent</artifactId>
- <version>3.2.11-incubating-SNAPSHOT</version>
- </parent>
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.parent</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>rya.pcj.fluo.integration</artifactId>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.pcj.fluo.integration</artifactId>
- <name>Apache Rya PCJ Fluo Integration Tests</name>
- <description>Integration tests for the Rya Fluo application.</description>
+ <name>Apache Rya PCJ Fluo Integration Tests</name>
+ <description>Integration tests for the Rya Fluo application.</description>
- <dependencies>
- <!-- Rya Runtime Dependencies. -->
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.pcj.fluo.api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.pcj.fluo.client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.indexing</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-api</artifactId>
- </dependency>
+ <dependencies>
+ <!-- Rya Runtime Dependencies. -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-api</artifactId>
+ </dependency>
- <!-- Testing dependencies. -->
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-mini</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-api</artifactId>
- </dependency>
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-mini</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.10.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.10.1.0</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- Testing dependencies. -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.10.1.0</version>
- <classifier>test</classifier>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-recipes-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.0</version>
+ <classifier>test</classifier>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-recipes-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
new file mode 100644
index 0000000..124569b
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java
@@ -0,0 +1,126 @@
+package org.apache.rya.indexing.pcj.fluo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.junit.Assert;
+import org.openrdf.model.Statement;
+
+import com.google.common.base.Objects;
+
+public class ConstructGraphTestUtils {
+
+ public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+ Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2));
+ }
+
+ public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+ Set<VisibilityStatementSet> set1 = new HashSet<>();
+ Set<VisibilityStatementSet> set2 = new HashSet<>();
+ subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements())));
+ subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements())));
+ Assert.assertEquals(set1, set2);
+ }
+
+ public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) {
+ Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>();
+ subgraph1.forEach(x->subGraphMap.put(getKey(x), x));
+ subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements()));
+ }
+
+ private static int getKey(RyaSubGraph subgraph) {
+ int key = 0;
+ for(RyaStatement statement: subgraph.getStatements()) {
+ key += statement.getObject().hashCode();
+ }
+ return key;
+ }
+
+ public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) {
+ Map<String, RyaURI> bNodeMap = new HashMap<>();
+ statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject()));
+ statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData())));
+ ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2);
+ }
+
+
+ /**
+ * Class used for comparing Sets of RyaStatements while ignoring timestamps.
+ * It is assumed that all RyaStatements in the Set used to construct this class
+ * have the same visibility.
+ */
+ public static class VisibilityStatementSet {
+
+ private Set<Statement> statements;
+ private String visibility;
+
+ public VisibilityStatementSet(Set<RyaStatement> statements) {
+ this.statements = new HashSet<>();
+ statements.forEach(x -> {
+ this.statements.add(RyaToRdfConversions.convertStatement(x));
+ if (visibility == null) {
+ if (x.getColumnVisibility() != null) {
+ visibility = new String(x.getColumnVisibility());
+ } else {
+ this.visibility = "";
+ }
+ }
+ });
+ }
+
+ public VisibilityStatementSet(RyaSubGraph subgraph) {
+ this(subgraph.getStatements());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(this == o) {
+ return true;
+ }
+
+ if(o instanceof VisibilityStatementSet) {
+ VisibilityStatementSet that = (VisibilityStatementSet) o;
+ return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(visibility, statements);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ return builder.append("Visiblity Statement Set \n").append(" Statements: " + statements + "\n")
+ .append(" Visibilities: " + visibility + " \n").toString();
+ }
+
+ }
+
+}