You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:28 UTC
[3/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
new file mode 100644
index 0000000..5a337c7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
+/**
+ * Metadata that is specific to a Projection.
+ */
+@Immutable
+@DefaultAnnotation(NonNull.class)
+public class ProjectionMetadata extends CommonNodeMetadata {
+
+ private final String childNodeId;
+ private final String parentNodeId;
+ private final VariableOrder projectedVars;
+
+ /**
+ * Constructs an instance of {@link ProjectionMetadata}.
+ *
+ * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
+ * @param varOrder - The order in which binding values are written in the row to identify this result. (not null)
+ * @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null)
+ * @param parentNodeId - The parent node of this projection (not null)
+ * @param projectedVars - The variables that the results are projected onto (not null)
+ */
+ public ProjectionMetadata(
+ final String nodeId,
+ final VariableOrder varOrder,
+ final String childNodeId,
+ final String parentNodeId,
+ final VariableOrder projectedVars) {
+ super(nodeId, varOrder);
+ this.childNodeId = checkNotNull(childNodeId);
+ this.parentNodeId = checkNotNull(parentNodeId);
+ this.projectedVars = checkNotNull(projectedVars);
+ }
+
+ /**
+ * @return The node whose results are projected to the query's SELECT variables.l
+ */
+ public String getChildNodeId() {
+ return childNodeId;
+ }
+
+ /**
+ * @return The parent node of this projection node
+ */
+ public String getParentNodeId() {
+ return parentNodeId;
+ }
+
+ /**
+ * @return The variables that results are projected onto
+ */
+ public VariableOrder getProjectedVars() {
+ return projectedVars;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ super.getNodeId(),
+ super.getVariableOrder(),
+ projectedVars,
+ childNodeId,
+ parentNodeId);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(this == o) {
+ return true;
+ }
+
+ if(o instanceof ProjectionMetadata) {
+ if(super.equals(o)) {
+ final ProjectionMetadata projectionMetadata = (ProjectionMetadata)o;
+ return new EqualsBuilder()
+ .append(childNodeId, projectionMetadata.childNodeId)
+ .append(parentNodeId, projectionMetadata.parentNodeId)
+ .append(projectedVars, projectionMetadata.projectedVars)
+ .isEquals();
+ }
+ return false;
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("ProjectionMetadata {\n")
+ .append(" Node ID: " + super.getNodeId() + "\n")
+ .append(" Projection Variables: " + projectedVars + "\n")
+ .append(" Variable Order: " + super.getVariableOrder() + "\n")
+ .append(" Child Node ID: " + childNodeId + "\n")
+ .append(" Parent Node ID: " + parentNodeId + "\n")
+ .append("}")
+ .toString();
+ }
+
+ /**
+ * Creates a new {@link Builder} for this class.
+ *
+ * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
+ * @return A new {@link Builder} for this class.
+ */
+ public static Builder builder(final String nodeId) {
+ return new Builder(nodeId);
+ }
+
+ /**
+ * Builds instances of {@link ProjectionMetadata}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ public static final class Builder implements CommonNodeMetadata.Builder {
+
+ private String nodeId;
+ private VariableOrder varOrder;
+ private String childNodeId;
+ private String parentNodeId;
+ private VariableOrder projectedVars;
+
+ /**
+ * Constructs an instance of {@link Builder}.
+ *
+ * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
+ */
+ public Builder(final String nodeId) {
+ this.nodeId = checkNotNull(nodeId);
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Set the variable order of binding sets that are emitted by this node.
+ *
+ * @param varOrder - The order in which result values are written to the row to identify this result
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
+ this.varOrder = varOrder;
+ return this;
+ }
+
+ /**
+ * @return the variable order of binding sets that are emitted by this node
+ */
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
+
+ /**
+ * Set the node whose results are projected to the query's SELECT variables.
+ *
+ * @param childNodeId - The node whose results are projected to the query's SELECT variables.
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setChildNodeId(@Nullable final String childNodeId) {
+ this.childNodeId = childNodeId;
+ return this;
+ }
+
+ public String getChildNodeId() {
+ return childNodeId;
+ }
+
+ /**
+ * Set the the parent node of this projection node.
+ *
+ * @param parentNodeId - The parent node of this projection node
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setParentNodeId(@Nullable final String parentNodeId) {
+ this.parentNodeId = parentNodeId;
+ return this;
+ }
+
+ public String getParentNodeId() {
+ return parentNodeId;
+ }
+
+ /**
+ * @param varOrder - Variables that results are projected onto
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setProjectedVars(VariableOrder projectedVars) {
+ this.projectedVars = projectedVars;
+ return this;
+ }
+
+ /**
+ * @return The variables that results are projected onto
+ */
+ public VariableOrder getProjectionVars() {
+ return projectedVars;
+ }
+
+ /**
+ * @return An instance of {@link ProjectionMetadata} built using this builder's values.
+ */
+ public ProjectionMetadata build() {
+ return new ProjectionMetadata(nodeId, varOrder, childNodeId, parentNodeId, projectedVars);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java
new file mode 100644
index 0000000..b45c56c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Base visitor class for navigating a {@link FluoQuery.Builder}.
+ * The visit methods in this class provide the basic functionality
+ * for navigating between the Builders that make u the FluoQuery.Builder.
+ *
+ */
+public abstract class QueryBuilderVisitorBase {
+
+ private FluoQuery.Builder fluoBuilder;
+
+ public QueryBuilderVisitorBase(FluoQuery.Builder fluoBuilder) {
+ this.fluoBuilder = Preconditions.checkNotNull(fluoBuilder);
+ }
+
+ public void visit() {
+ this.visit(fluoBuilder.getQueryBuilder());
+ }
+
+ /**
+ * Visits the {@link FluoQuery.Builder} starting at the Metadata bulder node with the given id
+ * @param nodeId - id of the node this visitor will start at
+ */
+ public void visit(String nodeId) {
+ visitNode(nodeId);
+ }
+
+ public void visit(QueryMetadata.Builder queryBuilder) {
+ visitNode(queryBuilder.getChildNodeId());
+ }
+
+ public void visit(ConstructQueryMetadata.Builder constructBuilder) {
+ visitNode(constructBuilder.getChildNodeId());
+ }
+
+ public void visit(ProjectionMetadata.Builder projectionBuilder) {
+ visitNode(projectionBuilder.getChildNodeId());
+ }
+
+ public void visit(PeriodicQueryMetadata.Builder periodicBuilder) {
+ visitNode(periodicBuilder.getChildNodeId());
+ }
+
+ public void visit(FilterMetadata.Builder filterBuilder) {
+ visitNode(filterBuilder.getChildNodeId());
+ }
+
+ public void visit(JoinMetadata.Builder joinBuilder) {
+ visitNode(joinBuilder.getLeftChildNodeId());
+ visitNode(joinBuilder.getRightChildNodeId());
+ }
+
+ public void visit(AggregationMetadata.Builder aggregationBuilder) {
+ visitNode(aggregationBuilder.getChildNodeId());
+ }
+
+ public void visit(StatementPatternMetadata.Builder statementPatternBuilder) {}
+
+ public void visitNode(String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ switch(type.get()) {
+ case AGGREGATION:
+ visit(fluoBuilder.getAggregateBuilder(nodeId).get());
+ break;
+ case CONSTRUCT:
+ visit(fluoBuilder.getConstructQueryBuilder(nodeId).get());
+ break;
+ case FILTER:
+ visit(fluoBuilder.getFilterBuilder(nodeId).get());
+ break;
+ case JOIN:
+ visit(fluoBuilder.getJoinBuilder(nodeId).get());
+ break;
+ case PERIODIC_QUERY:
+ visit(fluoBuilder.getPeriodicQueryBuilder(nodeId).get());
+ break;
+ case PROJECTION:
+ visit(fluoBuilder.getProjectionBuilder(nodeId).get());
+ break;
+ case QUERY:
+ visit(fluoBuilder.getQueryBuilder(nodeId).get());
+ break;
+ case STATEMENT_PATTERN:
+ visit(fluoBuilder.getStatementPatternBuilder(nodeId).get());
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ } catch(Exception e) {
+ throw new IllegalArgumentException("Invalid Fluo Query.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index d017724..fe130fb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -20,18 +20,24 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkNotNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
+import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Objects;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
/**
- * Metadata that is specific to a Projection.
+ * Metadata for a query registered with Fluo. This metadata is for the topmost node
+ * in the {@link FluoQuery}, and it includes information about how to export results
+ * for the query.
*/
@Immutable
@DefaultAnnotation(NonNull.class)
@@ -39,6 +45,9 @@ public class QueryMetadata extends CommonNodeMetadata {
private final String sparql;
private final String childNodeId;
+ private final Set<ExportStrategy> exportStrategy;
+ private final QueryType queryType;
+ private final String exportId;
/**
* Constructs an instance of {@link QueryMetadata}.
@@ -47,15 +56,29 @@ public class QueryMetadata extends CommonNodeMetadata {
* @param varOrder - The variable order of binding sets that are emitted by this node. (not null)
* @param sparql - The SPARQL query whose results are being updated by the Fluo app. (not null)
* @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null)
+ * @param exportStrategy - Set of export strategies used for emiting results from Rya-Fluo app
*/
public QueryMetadata(
final String nodeId,
final VariableOrder varOrder,
final String sparql,
- final String childNodeId) {
+ final String childNodeId,
+ final Set<ExportStrategy> exportStrategy,
+ final QueryType queryType) {
super(nodeId, varOrder);
this.sparql = checkNotNull(sparql);
this.childNodeId = checkNotNull(childNodeId);
+ this.exportStrategy = checkNotNull(exportStrategy);
+ this.queryType = checkNotNull(queryType);
+ String[] idSplit = nodeId.split("_");
+ if(idSplit.length != 2) {
+ throw new IllegalArgumentException("Invalid Query Node Id");
+ }
+ this.exportId = idSplit[1];
+ }
+
+ public String getExportId() {
+ return exportId;
}
/**
@@ -71,14 +94,30 @@ public class QueryMetadata extends CommonNodeMetadata {
public String getChildNodeId() {
return childNodeId;
}
-
+
+ /**
+ * @return strategies used for exporting results from Rya-Fluo Application
+ */
+ public Set<ExportStrategy> getExportStrategies() {
+ return exportStrategy;
+ }
+
+ /**
+ * @return the {@link QueryType} of this query
+ */
+ public QueryType getQueryType() {
+ return queryType;
+ }
+
@Override
public int hashCode() {
return Objects.hashCode(
super.getNodeId(),
super.getVariableOrder(),
sparql,
- childNodeId);
+ childNodeId,
+ exportStrategy,
+ queryType);
}
@Override
@@ -93,6 +132,8 @@ public class QueryMetadata extends CommonNodeMetadata {
return new EqualsBuilder()
.append(sparql, queryMetadata.sparql)
.append(childNodeId, queryMetadata.childNodeId)
+ .append(exportStrategy, queryMetadata.exportStrategy)
+ .append(queryType, queryMetadata.queryType)
.isEquals();
}
return false;
@@ -109,6 +150,8 @@ public class QueryMetadata extends CommonNodeMetadata {
.append(" Variable Order: " + super.getVariableOrder() + "\n")
.append(" Child Node ID: " + childNodeId + "\n")
.append(" SPARQL: " + sparql + "\n")
+ .append(" Query Type: " + queryType + "\n")
+ .append(" Export Strategies: " + exportStrategy + "\n")
.append("}")
.toString();
}
@@ -127,12 +170,14 @@ public class QueryMetadata extends CommonNodeMetadata {
* Builds instances of {@link QueryMetadata}.
*/
@DefaultAnnotation(NonNull.class)
- public static final class Builder {
+ public static final class Builder implements CommonNodeMetadata.Builder {
private String nodeId;
private VariableOrder varOrder;
private String sparql;
private String childNodeId;
+ private Set<ExportStrategy> exportStrategies;
+ private QueryType queryType;
/**
* Constructs an instance of {@link Builder}.
@@ -154,7 +199,7 @@ public class QueryMetadata extends CommonNodeMetadata {
* @param varOrder - The variable order of binding sets that are emitted by this node.
* @return This builder so that method invocations may be chained.
*/
- public Builder setVariableOrder(@Nullable final VariableOrder varOrder) {
+ public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
this.varOrder = varOrder;
return this;
}
@@ -188,6 +233,37 @@ public class QueryMetadata extends CommonNodeMetadata {
return this;
}
+ /**
+ * Sets export strategies used for emitting results form Rya Fluo app
+ * @param export - Set of export strategies
+ * @return This builder so that method invocations may be chained
+ */
+ public Builder setExportStrategies(Set<ExportStrategy> export) {
+ this.exportStrategies = export;
+ return this;
+ }
+
+ /**
+ * Set query type for the given query
+ * @param queryType - {@link QueryType} of the given query
+ * @return This builder so that method invocations may be chained
+ */
+ public Builder setQueryType(QueryType queryType) {
+ this.queryType = queryType;
+ return this;
+ }
+
+ /**
+ * @return QueryType for the given query
+ */
+ public QueryType getQueryType() {
+ return queryType;
+ }
+
+
+ /**
+ * @return id of the child node of this node
+ */
public String getChildNodeId() {
return childNodeId;
}
@@ -196,7 +272,7 @@ public class QueryMetadata extends CommonNodeMetadata {
* @return An instance of {@link QueryMetadata} build using this builder's values.
*/
public QueryMetadata build() {
- return new QueryMetadata(nodeId, varOrder, sparql, childNodeId);
+ return new QueryMetadata(nodeId, varOrder, sparql, childNodeId, exportStrategies, queryType);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java
new file mode 100644
index 0000000..ce9b02c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public abstract class QueryMetadataVisitorBase {
+
+ private FluoQuery fluoQuery;
+
+ public QueryMetadataVisitorBase(FluoQuery fluoQuery) {
+ this.fluoQuery = Preconditions.checkNotNull(fluoQuery);
+ }
+
+ public void visit() {
+ visit(fluoQuery.getQueryMetadata());
+ }
+
+ /**
+ * Visits the {@link FluoQuery} starting at the Metadata node with the given id
+ * @param nodeId - id of the node this visitor will start at
+ */
+ public void visit(String nodeId) {
+ visitNode(nodeId);
+ }
+
+ public void visit(QueryMetadata queryMetadata) {
+ visitNode(queryMetadata.getChildNodeId());
+ }
+
+ public void visit(ConstructQueryMetadata constructMetadata) {
+ visitNode(constructMetadata.getChildNodeId());
+ }
+
+ public void visit(ProjectionMetadata projectionMetadata) {
+ visitNode(projectionMetadata.getChildNodeId());
+ }
+
+ public void visit(PeriodicQueryMetadata periodicMetadata) {
+ visitNode(periodicMetadata.getChildNodeId());
+ }
+
+ public void visit(FilterMetadata filterMetadata) {
+ visitNode(filterMetadata.getChildNodeId());
+ }
+
+ public void visit(JoinMetadata joinMetadata) {
+ visitNode(joinMetadata.getLeftChildNodeId());
+ visitNode(joinMetadata.getRightChildNodeId());
+ }
+
+ public void visit(AggregationMetadata aggregationMetadata) {
+ visitNode(aggregationMetadata.getChildNodeId());
+ }
+
+ public void visit(StatementPatternMetadata statementPatternMetadata) {}
+
+ public void visitNode(String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ switch(type.get()) {
+ case AGGREGATION:
+ visit(fluoQuery.getAggregationMetadata(nodeId).get());
+ break;
+ case CONSTRUCT:
+ visit(fluoQuery.getConstructQueryMetadata(nodeId).get());
+ break;
+ case FILTER:
+ visit(fluoQuery.getFilterMetadata(nodeId).get());
+ break;
+ case JOIN:
+ visit(fluoQuery.getJoinMetadata(nodeId).get());
+ break;
+ case PERIODIC_QUERY:
+ visit(fluoQuery.getPeriodicQueryMetadata(nodeId).get());
+ break;
+ case PROJECTION:
+ visit(fluoQuery.getProjectionMetadata(nodeId).get());
+ break;
+ case QUERY:
+ visit(fluoQuery.getQueryMetadata(nodeId).get());
+ break;
+ case STATEMENT_PATTERN:
+ visit(fluoQuery.getStatementPatternMetadata(nodeId).get());
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ } catch(Exception e) {
+ throw new IllegalArgumentException("Invalid Fluo Query.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 8e348f2..6c03be1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -25,10 +25,11 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CO
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,16 +43,22 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.openrdf.model.Value;
import org.openrdf.model.impl.BNodeImpl;
+import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.AggregateOperator;
import org.openrdf.query.algebra.BNodeGenerator;
import org.openrdf.query.algebra.Extension;
@@ -75,6 +82,7 @@ import org.openrdf.query.algebra.ValueExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -90,32 +98,86 @@ import net.jcip.annotations.Immutable;
*/
public class SparqlFluoQueryBuilder {
+ private String sparql;
+ private TupleExpr te;
+ private String queryId;
+ private NodeIds nodeIds;
+ //Default behavior is to export to Kafka - subject to change when user can
+ //specify their own export strategy
+ private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka));
+
+ public SparqlFluoQueryBuilder setSparql(String sparql) {
+ this.sparql = Preconditions.checkNotNull(sparql);
+ return this;
+ }
+
+ public SparqlFluoQueryBuilder setTupleExpr(TupleExpr te) {
+ this.te = Preconditions.checkNotNull(te);
+ return this;
+ }
+
/**
- * Creates the {@link FluoQuery} metadata that is required by the Fluo
- * application to process a SPARQL query.
- *
- * @param parsedQuery - The query metadata will be derived from. (not null)
- * @param nodeIds - The NodeIds object is passed in so that other parts
- * of the application may look up which ID is associated with each
- * node of the query.
- * @return A {@link FluoQuery} object loaded with metadata built from the
- * {@link ParsedQuery}.
+ * Sets the FluoQuery id as generated by {@link NodeType#generateNewFluoIdForType(NodeType)} or
+ * {@link NodeType#generateNewIdForType(NodeType, String)}, where NodeType is of type Query.
+ * @param queryId for the {@link FluoQuery}
+ * @return SparqlFluoQueryBuilder for chaining method calls
*/
- public FluoQuery make(final ParsedQuery parsedQuery, final NodeIds nodeIds) {
- checkNotNull(parsedQuery);
-
- final String sparql = parsedQuery.getSourceString();
- final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
-
- final NewQueryVisitor visitor = new NewQueryVisitor(sparql, fluoQueryBuilder, nodeIds);
- TupleExpr te = parsedQuery.getTupleExpr();
+ public SparqlFluoQueryBuilder setFluoQueryId(String queryId) {
+ this.queryId = Preconditions.checkNotNull(queryId);
+ return this;
+ }
+
+ public SparqlFluoQueryBuilder setNodeIds(NodeIds nodeIds) {
+ this.nodeIds = Preconditions.checkNotNull(nodeIds);
+ return this;
+ }
+
+ public SparqlFluoQueryBuilder setExportStrategies(Set<ExportStrategy> exportStrategies) {
+ this.exportStrategies = exportStrategies;
+ return this;
+ }
+
+ public FluoQuery build() {
+ Preconditions.checkNotNull(sparql);
+ Preconditions.checkNotNull(queryId);
+ Preconditions.checkNotNull(exportStrategies);
+
+ if(nodeIds == null) {
+ nodeIds = new NodeIds();
+ }
+
+ if(te == null) {
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq;
+ try {
+ pq = parser.parseQuery(sparql, null);
+ } catch (MalformedQueryException e) {
+ throw new RuntimeException(e);
+ }
+ te = pq.getTupleExpr();
+ }
+
PeriodicQueryUtil.placePeriodicQueryNode(te);
+ String childNodeId = nodeIds.getOrMakeId(te);
+
+ final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
+ QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
+ //sets {@link QueryType} and VariableOrder
+ setVarOrderAndQueryType(queryBuilder, te);
+ queryBuilder.setSparql(sparql);
+ queryBuilder.setChildNodeId(childNodeId);
+ queryBuilder.setExportStrategies(exportStrategies);
+ fluoQueryBuilder.setQueryMetadata(queryBuilder);
+
+ setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);
+
+ final NewQueryVisitor visitor = new NewQueryVisitor(fluoQueryBuilder, nodeIds);
te.visit( visitor );
-
+
final FluoQuery fluoQuery = fluoQueryBuilder.build();
return fluoQuery;
}
-
+
/**
* A data structure that creates and keeps track of Node IDs for the nodes
* of a {@link ParsedQuery}. This structure should only be used while creating
@@ -187,7 +249,7 @@ public class SparqlFluoQueryBuilder {
} else if (node instanceof Join || node instanceof LeftJoin) {
prefix = JOIN_PREFIX;
} else if (node instanceof Projection) {
- prefix = QUERY_PREFIX;
+ prefix = PROJECTION_PREFIX;
} else if(node instanceof Extension) {
prefix = AGGREGATION_PREFIX;
} else if (node instanceof Reduced) {
@@ -214,7 +276,6 @@ public class SparqlFluoQueryBuilder {
private final NodeIds nodeIds;
private final FluoQuery.Builder fluoQueryBuilder;
- private final String sparql;
/**
* Constructs an instance of {@link NewQueryVisitor}.
@@ -227,8 +288,7 @@ public class SparqlFluoQueryBuilder {
* of the application may look up which ID is associated with each
* node of the query.
*/
- public NewQueryVisitor(final String sparql, final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) {
- this.sparql = checkNotNull(sparql);
+ public NewQueryVisitor(final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) {
this.fluoQueryBuilder = checkNotNull(fluoQueryBuilder);
this.nodeIds = checkNotNull(nodeIds);
}
@@ -256,6 +316,7 @@ public class SparqlFluoQueryBuilder {
} else {
groupByVariableOrder = new VariableOrder();
}
+
// The aggregations that need to be performed are the Group Elements.
final List<AggregationElement> aggregations = new ArrayList<>();
@@ -289,15 +350,21 @@ public class SparqlFluoQueryBuilder {
aggregationBuilder.setChildNodeId(childNodeId);
aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder);
+
+ Set<String> aggregationVars = getVarsToDelete(groupByVariableOrder.getVariableOrders(), aggregationBuilder.getVariableOrder().getVariableOrders());
+ FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.DeleteVariable, Lists.newArrayList(aggregationVars), aggregationId);
+
for(final AggregationElement aggregation : aggregations) {
aggregationBuilder.addAggregation(aggregation);
}
+
+
// Update the child node's metadata.
final Set<String> childVars = getVars(child);
final VariableOrder childVarOrder = new VariableOrder(childVars);
- setChildMetadata(childNodeId, childVarOrder, aggregationId);
+ setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, aggregationId);
}
// Walk to the next node.
@@ -369,11 +436,11 @@ public class SparqlFluoQueryBuilder {
// Create or update the left child's variable order and parent node id.
final VariableOrder leftVarOrder = varOrders.getLeftVarOrder();
- setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId);
+ setChildMetadata(fluoQueryBuilder, leftChildNodeId, leftVarOrder, joinNodeId);
// Create or update the right child's variable order and parent node id.
final VariableOrder rightVarOrder = varOrders.getRightVarOrder();
- setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId);
+ setChildMetadata(fluoQueryBuilder, rightChildNodeId, rightVarOrder, joinNodeId);
}
@Override
@@ -407,7 +474,7 @@ public class SparqlFluoQueryBuilder {
// Update the child node's metadata.
final Set<String> childVars = getVars((TupleExpr)child);
final VariableOrder childVarOrder = new VariableOrder(childVars);
- setChildMetadata(childNodeId, childVarOrder, filterId);
+ setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, filterId);
// Walk to the next node.
super.meet(node);
@@ -442,12 +509,12 @@ public class SparqlFluoQueryBuilder {
// Update the child node's metadata.
final Set<String> childVars = getVars((TupleExpr) child);
final VariableOrder childVarOrder = new VariableOrder(childVars);
- setChildMetadata(childNodeId, childVarOrder, periodicId);
+ setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, periodicId);
// update variable order of this node and all ancestors to
// include BIN_ID binding as
// first variable in the ordering
- PeriodicQueryUtil.updateVarOrdersToIncludeBin(fluoQueryBuilder, periodicId);
+ FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.AddVariable, Arrays.asList(IncrementalUpdateConstants.PERIODIC_BIN_ID), periodicId);
// Walk to the next node.
node.getArg().visit(this);
}
@@ -458,13 +525,12 @@ public class SparqlFluoQueryBuilder {
public void meet(final Projection node) {
// Create a builder for this node populated with the metadata.
final String queryId = nodeIds.getOrMakeId(node);
- final VariableOrder queryVarOrder = new VariableOrder(node.getBindingNames());
-
- final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
- fluoQueryBuilder.setQueryMetadata(queryBuilder);
- queryBuilder.setSparql(sparql);
- queryBuilder.setVariableOrder(queryVarOrder);
+ ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(queryId).orNull();
+ if (projectionBuilder == null) {
+ projectionBuilder = ProjectionMetadata.builder(queryId);
+ fluoQueryBuilder.addProjectionBuilder(projectionBuilder);
+ }
final QueryModelNode child = node.getArg();
if(child == null) {
@@ -472,13 +538,14 @@ public class SparqlFluoQueryBuilder {
}
final String childNodeId = nodeIds.getOrMakeId(child);
- queryBuilder.setChildNodeId(childNodeId);
+ projectionBuilder.setChildNodeId(childNodeId);
+ projectionBuilder.setProjectedVars(projectionBuilder.getVariableOrder());
// Update the child node's metadata.
final Set<String> childVars = getVars((TupleExpr)child);
final VariableOrder childVarOrder = new VariableOrder(childVars);
- setChildMetadata(childNodeId, childVarOrder, queryId);
+ setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, queryId);
// Walk to the next node.
super.meet(node);
@@ -489,10 +556,13 @@ public class SparqlFluoQueryBuilder {
//create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata
//builder with FluoQueryBuilder, and add metadata that we currently have
final String constructId = nodeIds.getOrMakeId(node);
- final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder();
- constructBuilder.setNodeId(constructId);
- fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
- constructBuilder.setSparql(sparql);
+
+ ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull();
+ if(constructBuilder == null) {
+ constructBuilder = ConstructQueryMetadata.builder();
+ constructBuilder.setNodeId(constructId);
+ fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+ }
//get child node
QueryModelNode child = node.getArg();
@@ -531,96 +601,12 @@ public class SparqlFluoQueryBuilder {
// Update the child node's metadata.
final Set<String> childVars = getVars((TupleExpr)child);
final VariableOrder childVarOrder = new VariableOrder(childVars);
- setChildMetadata(childNodeId, childVarOrder, constructId);
+ setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, constructId);
//fast forward visitor to next node we care about
child.visit(this);
}
-
- /**
- * Update a query node's metadata to include it's binding set variable order
- * and it's parent node id. This information is only known when handling
- * the parent node.
- *
- * @param childNodeId - The node ID of the child node.
- * @param childVarOrder - The variable order of the child node's binding sets.
- * @param parentNodeId - The node ID that consumes the child's binding sets.
- */
- private void setChildMetadata(final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) {
- checkNotNull(childNodeId);
- checkNotNull(childVarOrder);
- checkNotNull(parentNodeId);
-
- final NodeType childType = NodeType.fromNodeId(childNodeId).get();
- switch (childType) {
- case STATEMENT_PATTERN:
- StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
- if (spBuilder == null) {
- spBuilder = StatementPatternMetadata.builder(childNodeId);
- fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
- }
-
- spBuilder.setVarOrder(childVarOrder);
- spBuilder.setParentNodeId(parentNodeId);
- break;
-
- case JOIN:
- JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
- if (joinBuilder == null) {
- joinBuilder = JoinMetadata.builder(childNodeId);
- fluoQueryBuilder.addJoinMetadata(joinBuilder);
- }
-
- joinBuilder.setVariableOrder(childVarOrder);
- joinBuilder.setParentNodeId(parentNodeId);
- break;
-
- case FILTER:
- FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
- if (filterBuilder == null) {
- filterBuilder = FilterMetadata.builder(childNodeId);
- fluoQueryBuilder.addFilterMetadata(filterBuilder);
- }
-
- filterBuilder.setVarOrder(childVarOrder);
- filterBuilder.setParentNodeId(parentNodeId);
- break;
-
- case AGGREGATION:
- AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
- if (aggregationBuilder == null) {
- aggregationBuilder = AggregationMetadata.builder(childNodeId);
- fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
- }
-
- aggregationBuilder.setVariableOrder(childVarOrder);
- aggregationBuilder.setParentNodeId(parentNodeId);
- break;
-
- case QUERY:
- throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
-
- case CONSTRUCT:
- throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node.");
-
- case PERIODIC_QUERY:
- PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
- if (periodicQueryBuilder == null) {
- periodicQueryBuilder = PeriodicQueryMetadata.builder();
- periodicQueryBuilder.setNodeId(childNodeId);
- fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
- }
- periodicQueryBuilder.setVarOrder(childVarOrder);
- periodicQueryBuilder.setParentNodeId(parentNodeId);
- break;
-
- default:
- throw new IllegalArgumentException("Unsupported NodeType: " + childType);
-
- }
- }
-
private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) {
Map<String, Value> valueMap = new HashMap<>();
//create valueMap to associate source names with Values
@@ -654,6 +640,13 @@ public class SparqlFluoQueryBuilder {
return new ConstructGraph(constructProj);
}
+ private Set<String> getVarsToDelete(Collection<String> groupByVars, Collection<String> varOrderVars) {
+ Set<String> groupBySet = Sets.newHashSet(groupByVars);
+ Set<String> varOrderSet = Sets.newHashSet(varOrderVars);
+
+ return Sets.difference(varOrderSet, groupBySet);
+ }
+
private void validateProjectionElemList(ProjectionElemList list) {
List<ProjectionElem> elements = list.getElements();
checkArgument(elements.size() == 3);
@@ -662,8 +655,6 @@ public class SparqlFluoQueryBuilder {
checkArgument(elements.get(2).getTargetName().equals("object"));
}
-
-
/**
* Get the non-constant variables from a {@link TupleExpr}.
*
@@ -764,4 +755,199 @@ public class SparqlFluoQueryBuilder {
return shifted;
}
}
+
+ private void setVarOrderAndQueryType(QueryMetadata.Builder builder, TupleExpr te) {
+ QueryMetadataLocator locator = new QueryMetadataLocator();
+ try {
+ te.visit(locator);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ builder.setVarOrder(locator.getVarOrder());
+ builder.setQueryType(locator.getQueryType());
+ }
+
+ public static class QueryMetadataLocator extends QueryModelVisitorBase<Exception> {
+
+ private VariableOrder varOrder;
+ private QueryType queryType;
+
+ public VariableOrder getVarOrder() {
+ return varOrder;
+ }
+
+ public QueryType getQueryType() {
+ return queryType;
+ }
+
+ public void meet(Projection node) throws Exception {
+ Set<String> bindingNames = node.getBindingNames();
+ if(varOrder == null) {
+ varOrder = new VariableOrder(bindingNames);
+ }
+
+ if(queryType == null) {
+ queryType = QueryType.Projection;
+ }
+ super.meet(node);
+ }
+
+ public void meet(Reduced node) throws Exception {
+ if(varOrder == null) {
+ varOrder = getConstructGraphVarOrder(node);
+ }
+
+ if(queryType == null) {
+ queryType = QueryType.Construct;
+ }
+ super.meet(node);
+ }
+
+ public void meetOther(final QueryModelNode node) throws Exception {
+ if (node instanceof PeriodicQueryNode) {
+ queryType = QueryType.Periodic;
+ } else {
+ super.meetOther(node);
+ }
+ }
+ }
+
+ private static VariableOrder getConstructGraphVarOrder(Reduced node) {
+
+ //get child node
+ QueryModelNode child = node.getArg();
+ Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
+ UnaryTupleOperator unary = (UnaryTupleOperator) child;
+
+ //get ProjectionElemList to build ConstructGraph
+ final List<ProjectionElemList> projections = new ArrayList<>();
+ if(unary instanceof Projection) {
+ projections.add(((Projection) unary).getProjectionElemList());
+ } else {
+ projections.addAll(((MultiProjection)unary).getProjections());
+ }
+
+ return getConstructGraphVarOrder(projections);
+ }
+
+ private static VariableOrder getConstructGraphVarOrder(List<ProjectionElemList> projections) {
+ Set<String> varOrders = new HashSet<>();
+
+ for(ProjectionElemList elems: projections) {
+ for(ProjectionElem elem: elems.getElements()) {
+ String name = elem.getSourceName();
+ if(!name.startsWith("-const-") && !name.startsWith("-anon-")) {
+ varOrders.add(name);
+ }
+ }
+ }
+
+ return new VariableOrder(varOrders);
+ }
+
+
+ /**
+ * Update a query node's metadata to include it's binding set variable order
+ * and it's parent node id. This information is only known when handling
+ * the parent node.
+ *
+ * @param fluoQueryBuilder - Builder whose metadata is updatad
+ * @param childNodeId - The node ID of the child node.
+ * @param childVarOrder - The variable order of the child node's binding sets.
+ * @param parentNodeId - The node ID that consumes the child's binding sets.
+ */
+ private static void setChildMetadata(final FluoQuery.Builder fluoQueryBuilder, final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) {
+ checkNotNull(childNodeId);
+ checkNotNull(childVarOrder);
+ checkNotNull(parentNodeId);
+
+ final NodeType childType = NodeType.fromNodeId(childNodeId).get();
+ switch (childType) {
+ case STATEMENT_PATTERN:
+ StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
+ if (spBuilder == null) {
+ spBuilder = StatementPatternMetadata.builder(childNodeId);
+ fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
+ }
+
+ spBuilder.setVarOrder(childVarOrder);
+ spBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case JOIN:
+ JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
+ if (joinBuilder == null) {
+ joinBuilder = JoinMetadata.builder(childNodeId);
+ fluoQueryBuilder.addJoinMetadata(joinBuilder);
+ }
+
+ joinBuilder.setVarOrder(childVarOrder);
+ joinBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case FILTER:
+ FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
+ if (filterBuilder == null) {
+ filterBuilder = FilterMetadata.builder(childNodeId);
+ fluoQueryBuilder.addFilterMetadata(filterBuilder);
+ }
+
+ filterBuilder.setVarOrder(childVarOrder);
+ filterBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case AGGREGATION:
+ AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+ if (aggregationBuilder == null) {
+ aggregationBuilder = AggregationMetadata.builder(childNodeId);
+ fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+ }
+
+ aggregationBuilder.setVarOrder(childVarOrder);
+ aggregationBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case PROJECTION:
+ ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(childNodeId).orNull();
+ if(projectionBuilder == null) {
+ projectionBuilder = ProjectionMetadata.builder(childNodeId);
+ fluoQueryBuilder.addProjectionBuilder(projectionBuilder);
+ }
+
+ projectionBuilder.setVarOrder(childVarOrder);
+ projectionBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case QUERY:
+ throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
+
+ case CONSTRUCT:
+ ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull();
+ if(constructBuilder == null) {
+ constructBuilder = ConstructQueryMetadata.builder();
+ constructBuilder.setNodeId(childNodeId);
+ fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
+ }
+
+ Preconditions.checkArgument(childNodeId.equals(constructBuilder.getNodeId()));
+ constructBuilder.setVarOrder(childVarOrder);
+ constructBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case PERIODIC_QUERY:
+ PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
+ if (periodicQueryBuilder == null) {
+ periodicQueryBuilder = PeriodicQueryMetadata.builder();
+ periodicQueryBuilder.setNodeId(childNodeId);
+ fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
+ }
+ periodicQueryBuilder.setVarOrder(childVarOrder);
+ periodicQueryBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
index 7de10d5..beead93 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java
@@ -127,7 +127,7 @@ public class StatementPatternMetadata extends CommonNodeMetadata {
* Builds instances of {@link StatementPatternMetadata}.
*/
@DefaultAnnotation(NonNull.class)
- public static final class Builder {
+ public static final class Builder implements CommonNodeMetadata.Builder {
private final String nodeId;
private VariableOrder varOrder;
@@ -160,6 +160,11 @@ public class StatementPatternMetadata extends CommonNodeMetadata {
this.varOrder = varOrder;
return this;
}
+
+ @Override
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
/**
* Sets the statement pattern new statements are matched against.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
new file mode 100644
index 0000000..303f9bb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class for manipulating components of a {@link FluoQuery}.
+ *
+ */
+public class FluoQueryUtils {
+
+ /**
+ * Updates the {@link VariableOrder}s of a given {@link FluoQuery.Builder}.
+ * @param builder - builder whose VariableOrders will be updated
+ * @param action - add or delete variables
+ * @param variables - variables to be added or deleted
+ * @param stopNodeId - node to stop at
+ * @return - FluoQuery.Builder with updated VariableOrders
+ */
+ public static FluoQuery.Builder updateVarOrders(FluoQuery.Builder builder, UpdateAction action, List<String> variables, String stopNodeId) {
+ VariableOrderUpdateVisitor visitor = new VariableOrderUpdateVisitor(builder, action, variables, stopNodeId);
+ visitor.visit();
+
+ return builder;
+ }
+
+ /**
+ * Converts the fluo query id to a pcj id
+ * @param fluoQueryId - query id of the form query_prefix + _ + UUID
+ * @return the pcjid which consists of only the UUID portion of the fluo query id
+ */
+ public static String convertFluoQueryIdToPcjId(String fluoQueryId) {
+ Preconditions.checkNotNull(fluoQueryId);
+ String[] queryIdParts = fluoQueryId.split(IncrementalUpdateConstants.QUERY_PREFIX + "_");
+ Preconditions.checkArgument(queryIdParts.length == 2 && queryIdParts[1]!= null && queryIdParts[1].length() > 0);
+ return queryIdParts[1];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
index fd24af2..406ba4c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
@@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.app.util;
import static com.google.common.base.Preconditions.checkArgument;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -30,13 +29,8 @@ import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.openrdf.model.Literal;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
@@ -194,72 +188,6 @@ public class PeriodicQueryUtil {
}
/**
- * Adds the variable "periodicBinId" to the beginning of all {@link VariableOrder}s for the
- * Metadata nodes that appear above the PeriodicQueryNode. This ensures that the binId is
- * written first in the Row so that bins can be easily scanned and deleted.
- * @param builder
- * @param nodeId
- */
- public static void updateVarOrdersToIncludeBin(FluoQuery.Builder builder, String nodeId) {
- NodeType type = NodeType.fromNodeId(nodeId).orNull();
- if (type == null) {
- throw new IllegalArgumentException("NodeId must be associated with an existing MetadataBuilder.");
- }
- switch (type) {
- case AGGREGATION:
- AggregationMetadata.Builder aggBuilder = builder.getAggregateBuilder(nodeId).orNull();
- if (aggBuilder != null) {
- VariableOrder varOrder = aggBuilder.getVariableOrder();
- VariableOrder groupOrder = aggBuilder.getGroupByVariableOrder();
- // update varOrder with BIN_ID
- List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
- orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
- aggBuilder.setVariableOrder(new VariableOrder(orderList));
- // update groupVarOrder with BIN_ID
- List<String> groupOrderList = new ArrayList<>(groupOrder.getVariableOrders());
- groupOrderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
- aggBuilder.setGroupByVariableOrder(new VariableOrder(groupOrderList));
- // recursive call to update the VariableOrders of all ancestors
- // of this node
- updateVarOrdersToIncludeBin(builder, aggBuilder.getParentNodeId());
- } else {
- throw new IllegalArgumentException("There is no AggregationMetadata.Builder for the indicated Id.");
- }
- break;
- case PERIODIC_QUERY:
- PeriodicQueryMetadata.Builder periodicBuilder = builder.getPeriodicQueryBuilder().orNull();
- if (periodicBuilder != null && periodicBuilder.getNodeId().equals(nodeId)) {
- VariableOrder varOrder = periodicBuilder.getVarOrder();
- List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
- orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
- periodicBuilder.setVarOrder(new VariableOrder(orderList));
- // recursive call to update the VariableOrders of all ancestors
- // of this node
- updateVarOrdersToIncludeBin(builder, periodicBuilder.getParentNodeId());
- } else {
- throw new IllegalArgumentException(
- "PeriodicQueryMetadata.Builder id does not match the indicated id. A query cannot have more than one PeriodicQueryMetadata Node.");
- }
- break;
- case QUERY:
- QueryMetadata.Builder queryBuilder = builder.getQueryBuilder().orNull();
- if (queryBuilder != null && queryBuilder.getNodeId().equals(nodeId)) {
- VariableOrder varOrder = queryBuilder.getVariableOrder();
- List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
- orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
- queryBuilder.setVariableOrder(new VariableOrder(orderList));
- } else {
- throw new IllegalArgumentException(
- "QueryMetadata.Builder id does not match the indicated id. A query cannot have more than one QueryMetadata Node.");
- }
- break;
- default:
- throw new IllegalArgumentException(
- "Incorrectly positioned PeriodicQueryNode. The PeriodicQueryNode can only be positioned below Projections, Extensions, and ConstructQueryNodes.");
- }
- }
-
- /**
* Collects all Metadata node Ids that are ancestors of the PeriodicQueryNode and contain the variable
* {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}.
* @param sx - Fluo Snapshot for scanning Fluo
@@ -277,6 +205,10 @@ public class PeriodicQueryUtil {
case PERIODIC_QUERY:
ids.add(nodeId);
break;
+ case PROJECTION:
+ ids.add(nodeId);
+ getPeriodicQueryNodeAncestorIds(sx, sx.get( Bytes.of(nodeId), FluoQueryColumns.PROJECTION_CHILD_NODE_ID).toString(), ids);
+ break;
case QUERY:
ids.add(nodeId);
getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.QUERY_CHILD_NODE_ID).toString(), ids);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java
new file mode 100644
index 0000000..f433849
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryBuilderVisitorBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Visitor that traverses a {@link FluoQuery.Builder} and performs the indicated {@link UpdateAction}
+ * on the {@link VariableOrder}s of each node using a provided list of variables. The visitor
+ * either adds the provided list of variables to the VariableOrder of each node or deletes the
+ * provided variables from the VariableOrder of each node.
+ *
+ */
+public class VariableOrderUpdateVisitor extends QueryBuilderVisitorBase {
+
+ /**
+ * Enum class indicating whether to add or delete variables from
+ * the VariableOrders of nodes in the FluoQuery.
+ *
+ */
+ public static enum UpdateAction {
+ AddVariable, DeleteVariable
+ };
+
+ private UpdateAction action;
+ private List<String> variables;
+ private String stopNodeId;
+
+ /**
+ * Creates a VariableOrderUpdateVisitor to update the variables in a given FluoQuery.Builder
+ * @param fluoBuilder - builder whose VariableOrder will be updated
+ * @param action - either add or delete
+ * @param variables - variables to be added or deleted
+ * @param stopNodeId - indicates the builder node to stop at
+ */
+ public VariableOrderUpdateVisitor(FluoQuery.Builder fluoBuilder, UpdateAction action, List<String> variables, String stopNodeId) {
+ super(fluoBuilder);
+ this.action = Preconditions.checkNotNull(action);
+ this.variables = Preconditions.checkNotNull(variables);
+ this.stopNodeId = Preconditions.checkNotNull(stopNodeId);
+ }
+
+ public void visit(QueryMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(ProjectionMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ if(action == UpdateAction.AddVariable) {
+ builder.setProjectedVars(updateOrder(builder.getProjectionVars()));
+ }
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(ConstructQueryMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(FilterMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(PeriodicQueryMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(JoinMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(AggregationMetadata.Builder builder) {
+ builder.setVarOrder(updateOrder(builder.getVariableOrder()));
+ builder.setGroupByVariableOrder(updateOrder(builder.getGroupByVariableOrder()));
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ public void visit(StatementPatternMetadata.Builder builder) {
+ if(!atStopNode(builder.getNodeId())) {
+ super.visit(builder);
+ }
+ }
+
+ boolean atStopNode(String nodeId) {
+ return nodeId.equals(stopNodeId);
+ }
+
+ private VariableOrder updateOrder(VariableOrder varOrder) {
+
+ switch (action) {
+ case AddVariable:
+ varOrder = addBindingToOrder(varOrder);
+ break;
+ case DeleteVariable:
+ varOrder = deleteBindingFromOrder(varOrder);
+ break;
+ }
+ return varOrder;
+ }
+
+ private VariableOrder addBindingToOrder(VariableOrder varOrder) {
+ List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
+ orderList.addAll(0, variables);
+ return new VariableOrder(orderList);
+ }
+
+ private VariableOrder deleteBindingFromOrder(VariableOrder varOrder) {
+ List<String> vars = new ArrayList<>();
+ varOrder.getVariableOrders().forEach(x -> {
+ if (!variables.contains(x) || x.equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
+ vars.add(x);
+ }
+ });
+ return new VariableOrder(vars);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
index c8ca6af..b40ba3f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor;
@@ -162,17 +162,17 @@ public class PeriodicQueryUtilTest {
+ "?obs <uri:hasTime> ?time. " //n
+ "?obs <uri:hasLattitude> ?lat }"; //n
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq = parser.parseQuery(query, null);
SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
- FluoQuery fluoQuery = builder.make(pq, new NodeIds());
+ builder.setSparql(query);
+ builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+ FluoQuery fluoQuery = builder.build();
PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull();
Assert.assertEquals(true, periodicMeta != null);
VariableOrder periodicVars = periodicMeta.getVariableOrder();
Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0));
- QueryMetadata queryMeta = fluoQuery.getQueryMetadata().get();
+ QueryMetadata queryMeta = fluoQuery.getQueryMetadata();
VariableOrder queryVars = queryMeta.getVariableOrder();
Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
new file mode 100644
index 0000000..b432868
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryBuilderVisitorTest {
+
+ @Test
+ public void builderTest() {
+
+ FluoQuery.Builder fluoBuilder = FluoQuery.builder();
+
+ String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY);
+ String projectionId = NodeType.generateNewFluoIdForType(NodeType.PROJECTION);
+ String joinId = NodeType.generateNewFluoIdForType(NodeType.JOIN);
+ String leftSp = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ String rightSp = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+
+ List<String> expected = Arrays.asList(queryId, projectionId, joinId, leftSp, rightSp);
+
+ QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
+ queryBuilder.setChildNodeId(projectionId);
+
+ ProjectionMetadata.Builder projectionBuilder = ProjectionMetadata.builder(projectionId);
+ projectionBuilder.setChildNodeId(joinId);
+
+ JoinMetadata.Builder joinBuilder = JoinMetadata.builder(joinId);
+ joinBuilder.setLeftChildNodeId(leftSp);
+ joinBuilder.setRightChildNodeId(rightSp);
+
+ StatementPatternMetadata.Builder left = StatementPatternMetadata.builder(leftSp);
+ StatementPatternMetadata.Builder right = StatementPatternMetadata.builder(rightSp);
+
+ fluoBuilder.setQueryMetadata(queryBuilder);
+ fluoBuilder.addProjectionBuilder(projectionBuilder);
+ fluoBuilder.addJoinMetadata(joinBuilder);
+ fluoBuilder.addStatementPatternBuilder(left);
+ fluoBuilder.addStatementPatternBuilder(right);
+
+ QueryBuilderPrinter printer = new QueryBuilderPrinter(fluoBuilder);
+ printer.visit();
+ Assert.assertEquals(expected, printer.getIds());
+ }
+
+
+ public static class QueryBuilderPrinter extends QueryBuilderVisitorBase {
+
+ private List<String> ids = new ArrayList<>();
+
+ public List<String> getIds() {
+ return ids;
+ }
+
+ public QueryBuilderPrinter(FluoQuery.Builder builder) {
+ super(builder);
+ }
+
+ public void visit(QueryMetadata.Builder queryBuilder) {
+ System.out.println(queryBuilder.getNodeId());
+ ids.add(queryBuilder.getNodeId());
+ super.visit(queryBuilder);
+ }
+
+ public void visit(ProjectionMetadata.Builder projectionBuilder) {
+ System.out.println(projectionBuilder.getNodeId());
+ ids.add(projectionBuilder.getNodeId());
+ super.visit(projectionBuilder);
+ }
+
+ public void visit(JoinMetadata.Builder joinBuilder) {
+ System.out.println(joinBuilder.getNodeId());
+ ids.add(joinBuilder.getNodeId());
+ super.visit(joinBuilder);
+ }
+
+ public void visit(StatementPatternMetadata.Builder statementBuilder) {
+ System.out.println(statementBuilder.getNodeId());
+ ids.add(statementBuilder.getNodeId());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
new file mode 100644
index 0000000..5c89a75
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryMetadataVisitorTest {
+
+ @Test
+ public void builderTest() {
+ String query = "prefix function: <http://org.apache.rya/function#> " // n
+ + "prefix time: <http://www.w3.org/2006/time#> " // n
+ + "select ?id (count(?obs) as ?total) where {" // n
+ + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+ + "?obs <uri:hasTime> ?time. " // n
+ + "?obs <uri:hasId> ?id } group by ?id"; // n
+
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
+ builder.setSparql(query);
+ FluoQuery fluoQuery = builder.build();
+
+ QueryMetadata queryMetadata = fluoQuery.getQueryMetadata();
+ String queryId = queryMetadata.getNodeId();
+ String projectionId = queryMetadata.getChildNodeId();
+ String aggId = fluoQuery.getProjectionMetadata(projectionId).get().getChildNodeId();
+ String periodicId = fluoQuery.getAggregationMetadata(aggId).get().getChildNodeId();
+ String joinId = fluoQuery.getPeriodicQueryMetadata(periodicId).get().getChildNodeId();
+ String leftSp = fluoQuery.getJoinMetadata(joinId).get().getLeftChildNodeId();
+ String rightSp = fluoQuery.getJoinMetadata(joinId).get().getRightChildNodeId();
+
+ List<String> expected = Arrays.asList(queryId, projectionId, aggId, periodicId, joinId, leftSp, rightSp);
+ QueryMetadataVisitor visitor = new QueryMetadataVisitor(fluoQuery);
+ visitor.visit();
+
+ Assert.assertEquals(expected, visitor.getIds());
+ }
+
+
+ public static class QueryMetadataVisitor extends QueryMetadataVisitorBase {
+
+ private List<String> ids = new ArrayList<>();
+
+ public List<String> getIds() {
+ return ids;
+ }
+
+ public QueryMetadataVisitor(FluoQuery metadata) {
+ super(metadata);
+ }
+
+ public void visit(QueryMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(ProjectionMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(JoinMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(StatementPatternMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ }
+
+ public void visit(PeriodicQueryMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(FilterMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(AggregationMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index 854798d..3f335f4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -41,7 +41,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest;
import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -124,7 +124,7 @@ public class NewQueryCommand implements PcjAdminClientCommand {
log.trace("SPARQL Query: " + request.getQuery());
log.trace("Var Orders: " + request.getVarOrders());
log.trace("Loading these values into the Fluo app.");
- final CreatePcj createPcj = new CreatePcj();
+ final CreateFluoPcj createPcj = new CreateFluoPcj();
try {
// Create the PCJ in Rya.
final String sparql = request.getQuery();