You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:29 UTC
[4/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 2084907..4b6f44e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -35,9 +35,13 @@ public class IncrementalUpdateConstants {
public static final String FILTER_PREFIX = "FILTER";
public static final String AGGREGATION_PREFIX = "AGGREGATION";
public static final String QUERY_PREFIX = "QUERY";
+ public static final String PROJECTION_PREFIX = "PROJECTION";
public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
+ public static enum QueryType{Construct, Projection, Periodic};
+ public static enum ExportStrategy{Rya, Kafka};
+
public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 9b65b34..0f448a6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -139,9 +139,9 @@ public class JoinResultUpdater {
// Create the Row Key for the emitted binding set. It does not contain visibilities.
final Bytes resultRow = RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult);
-
- // Only insert the join Binding Set if it is new.
- if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null) {
+
+ // Only insert the join Binding Set if it is new or BindingSet contains values not used in resultRow.
+ if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null || joinVarOrder.getVariableOrders().size() < newJoinResult.size()) {
// Create the Node Value. It does contain visibilities.
final Bytes nodeValueBytes = BS_SERDE.serialize(newJoinResult);
@@ -210,18 +210,28 @@ public class JoinResultUpdater {
final NodeType nodeType = NodeType.fromNodeId(nodeId).get();
switch(nodeType) {
case STATEMENT_PATTERN:
- return queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder();
-
+ return removeBinIdFromVarOrder(queryDao.readStatementPatternMetadata(tx, nodeId).getVariableOrder());
case FILTER:
- return queryDao.readFilterMetadata(tx, nodeId).getVariableOrder();
-
+ return removeBinIdFromVarOrder(queryDao.readFilterMetadata(tx, nodeId).getVariableOrder());
case JOIN:
- return queryDao.readJoinMetadata(tx, nodeId).getVariableOrder();
-
+ return removeBinIdFromVarOrder(queryDao.readJoinMetadata(tx, nodeId).getVariableOrder());
+ case PROJECTION:
+ return removeBinIdFromVarOrder(queryDao.readProjectionMetadata(tx, nodeId).getVariableOrder());
default:
throw new IllegalArgumentException("Could not figure out the variable order for node with ID: " + nodeId);
}
}
+
+ private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) {
+ List<String> varOrderList = varOrder.getVariableOrders();
+ if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
+ List<String> updatedVarOrderList = Lists.newArrayList(varOrderList);
+ updatedVarOrderList.remove(0);
+ return new VariableOrder(updatedVarOrderList);
+ } else {
+ return varOrder;
+ }
+ }
/**
* Assuming that the common variables between two children are already
@@ -285,6 +295,9 @@ public class JoinResultUpdater {
case JOIN:
column = FluoQueryColumns.JOIN_BINDING_SET;
break;
+ case PROJECTION:
+ column = FluoQueryColumns.PROJECTION_BINDING_SET;
+ break;
default:
throw new IllegalArgumentException("The child node's sibling is not of type StatementPattern, Join, Left Join, or Filter.");
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
index b8fc2d9..a6fc5ea 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java
@@ -24,10 +24,12 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CO
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
import java.util.List;
+import java.util.UUID;
import org.apache.fluo.api.data.Column;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -39,13 +41,14 @@ import com.google.common.base.Optional;
* Represents the different types of nodes that a Query may have.
*/
public enum NodeType {
- PERIODIC_QUERY(QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET),
- FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET),
- JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET),
- STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
- QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET),
- AGGREGATION(QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET),
- CONSTRUCT(QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS);
+ PERIODIC_QUERY(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX, QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET),
+ FILTER (IncrementalUpdateConstants.FILTER_PREFIX, QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET),
+ JOIN(IncrementalUpdateConstants.JOIN_PREFIX, QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET),
+ STATEMENT_PATTERN(IncrementalUpdateConstants.SP_PREFIX, QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET),
+ QUERY(IncrementalUpdateConstants.QUERY_PREFIX, QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET),
+ AGGREGATION(IncrementalUpdateConstants.AGGREGATION_PREFIX, QueryNodeMetadataColumns.AGGREGATION_COLUMNS, FluoQueryColumns.AGGREGATION_BINDING_SET),
+ PROJECTION(IncrementalUpdateConstants.PROJECTION_PREFIX, QueryNodeMetadataColumns.PROJECTION_COLUMNS, FluoQueryColumns.PROJECTION_BINDING_SET),
+ CONSTRUCT(IncrementalUpdateConstants.CONSTRUCT_PREFIX, QueryNodeMetadataColumns.CONSTRUCT_COLUMNS, FluoQueryColumns.CONSTRUCT_STATEMENTS);
//Metadata Columns associated with given NodeType
private QueryNodeMetadataColumns metadataColumns;
@@ -53,15 +56,25 @@ public enum NodeType {
//Column where results are stored for given NodeType
private Column resultColumn;
+ //Prefix for the given node type
+ private String nodePrefix;
/**
* Constructs an instance of {@link NodeType}.
*
* @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null)
* @param resultColumn - The {@link Column} used to store this {@link NodeType}'s results. (not null)
*/
- private NodeType(QueryNodeMetadataColumns metadataColumns, Column resultColumn) {
+ private NodeType(String nodePrefix, QueryNodeMetadataColumns metadataColumns, Column resultColumn) {
this.metadataColumns = requireNonNull(metadataColumns);
this.resultColumn = requireNonNull(resultColumn);
+ this.nodePrefix = requireNonNull(nodePrefix);
+ }
+
+ /**
+ * @return the prefix for the given node type
+ */
+ public String getNodeTypePrefix() {
+ return nodePrefix;
}
/**
@@ -103,10 +116,38 @@ public enum NodeType {
type = AGGREGATION;
} else if(nodeId.startsWith(CONSTRUCT_PREFIX)) {
type = CONSTRUCT;
+ } else if(nodeId.startsWith(PROJECTION_PREFIX)) {
+ type = PROJECTION;
} else if(nodeId.startsWith(PERIODIC_QUERY_PREFIX)) {
type = PERIODIC_QUERY;
}
return Optional.fromNullable(type);
}
+
+ /**
+ * Creates an id for a given NodeType that is of the form {@link NodeType#getNodeTypePrefix()} + "_" + pcjId,
+ * where the pcjId is an auto generated UUID with all dashes removed.
+ * @param type {@link NodeType}
+ * @return id for the given NodeType
+ */
+ public static String generateNewFluoIdForType(NodeType type) {
+ String unique = UUID.randomUUID().toString().replaceAll("-", "");
+ // Put them together to create the Node ID.
+ return type.getNodeTypePrefix() + "_" + unique;
+ }
+
+ /**
+ * Creates an id for a given NodeType that is of the form {@link NodeType#getNodeTypePrefix()} + "_" + pcjId
+ *
+ * @param type {@link NodeType}
+ * @return id for the given NodeType
+ */
+ public static String generateNewIdForType(NodeType type, String pcjId) {
+ // Put them together to create the Node ID.
+ return type.getNodeTypePrefix() + "_" + pcjId;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
index ae4912b..cb331cf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
@@ -34,7 +34,6 @@ import org.openrdf.model.Literal;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.Binding;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
new file mode 100644
index 0000000..f9d8257
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.openrdf.query.BindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Updates the results of a Projection node when one of its children has added a
+ * new Binding Set to its results.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ProjectionResultUpdater {
+ private static final Logger log = Logger.getLogger(QueryResultUpdater.class);
+
+ private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+
+ /**
+ * Updates the results of a Projection node when one of its children has added a
+ * new Binding Set to its results.
+ *
+ * @param tx - The transaction all Fluo queries will use. (not null)
+ * @param childBindingSet - A binding set that the query's child node has emmitted. (not null)
+ * @param projectionMetadata - The metadata of the Query whose results will be updated. (not null)
+ * @throws Exception A problem caused the update to fail.
+ */
+ public void updateProjectionResults(
+ final TransactionBase tx,
+ final VisibilityBindingSet childBindingSet,
+ final ProjectionMetadata projectionMetadata) throws Exception {
+ checkNotNull(tx);
+ checkNotNull(childBindingSet);
+ checkNotNull(projectionMetadata);
+
+ log.trace(
+ "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+ "Node ID: " + projectionMetadata.getNodeId() + "\n" +
+ "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" +
+ "Child Node ID: " + projectionMetadata.getChildNodeId() + "\n" +
+ "Child Binding Set:\n" + childBindingSet + "\n");
+
+ // Create the query's Binding Set from the child node's binding set.
+ final VariableOrder queryVarOrder = projectionMetadata.getVariableOrder();
+ final VariableOrder projectionVarOrder = projectionMetadata.getProjectedVars();
+ final BindingSet queryBindingSet = BindingSetUtil.keepBindings(projectionVarOrder, childBindingSet);
+
+ // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables.
+ Bytes resultRow = RowKeyUtil.makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, queryBindingSet);
+
+ // Create the Binding Set that goes in the Node Value. It does contain visibilities.
+ final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()));
+
+ log.trace(
+ "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+ "New Binding Set: " + childBindingSet + "\n");
+
+ tx.set(resultRow, FluoQueryColumns.PROJECTION_BINDING_SET, nodeValueBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index 44fc9bd..37d7256 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -23,16 +23,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
-import org.openrdf.query.BindingSet;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -45,7 +41,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
public class QueryResultUpdater {
private static final Logger log = Logger.getLogger(QueryResultUpdater.class);
- private static final FluoQueryMetadataDAO METADATA_DA0 = new FluoQueryMetadataDAO();
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
/**
@@ -73,23 +68,12 @@ public class QueryResultUpdater {
// Create the query's Binding Set from the child node's binding set.
final VariableOrder queryVarOrder = queryMetadata.getVariableOrder();
- final BindingSet queryBindingSet = BindingSetUtil.keepBindings(queryVarOrder, childBindingSet);
// Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables.
- final Bytes resultRow;
-
- final String childNodeId = queryMetadata.getChildNodeId();
- final boolean isGrouped = childNodeId.startsWith( IncrementalUpdateConstants.AGGREGATION_PREFIX );
- if(isGrouped) {
- final AggregationMetadata aggMetadata = METADATA_DA0.readAggregationMetadata(tx, childNodeId);
- final VariableOrder groupByVars = aggMetadata.getGroupByVariableOrder();
- resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), groupByVars, queryBindingSet);
- } else {
- resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, queryBindingSet);
- }
+ final Bytes resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet);
// Create the Binding Set that goes in the Node Value. It does contain visibilities.
- final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet,childBindingSet.getVisibility()));
+ final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
log.trace(
"Transaction ID: " + tx.getStartTimestamp() + "\n" +
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
index a15743f..fa27b46 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -30,8 +30,6 @@ import org.apache.rya.api.domain.RyaSubGraph;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
-import com.google.common.base.Preconditions;
-
/**
* Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application
*
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index 3a731c2..7d0fd5e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -31,6 +31,7 @@ import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.ProjectionResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
@@ -38,6 +39,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -61,6 +63,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
+ private final ProjectionResultUpdater projectionUpdater = new ProjectionResultUpdater();
private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater();
@Override
@@ -107,6 +110,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
}
break;
+ case PROJECTION:
+ final ProjectionMetadata projectionQuery = queryDao.readProjectionMetadata(tx, parentNodeId);
+ try {
+ projectionUpdater.updateProjectionResults(tx, observedBindingSet, projectionQuery);
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not process a Query node.", e);
+ }
+ break;
+
case CONSTRUCT:
final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
try{
@@ -154,7 +166,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
default:
- throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
+ throw new IllegalArgumentException("The parent node's NodeType must be of type Aggregation, Projection, ConstructQuery, Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
new file mode 100644
index 0000000..b712606
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+/**
+ * Performs incremental result exporting to the configured destinations.
+ */
+public class ProjectionObserver extends BindingSetUpdater {
+ private static final Logger log = Logger.getLogger(ProjectionObserver.class);
+
+ private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+ private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+ @Override
+ public ObservedColumn getObservedColumn() {
+ return new ObservedColumn(FluoQueryColumns.PROJECTION_BINDING_SET, NotificationType.STRONG);
+ }
+
+ @Override
+ public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception {
+ requireNonNull(tx);
+ requireNonNull(row);
+
+ // Read the Filter metadata.
+ final String projectionNodeId = BindingSetRow.make(row).getNodeId();
+ final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId);
+
+ // Read the Visibility Binding Set from the value.
+ final Bytes valueBytes = tx.get(row, FluoQueryColumns.PROJECTION_BINDING_SET);
+ final VisibilityBindingSet projectionBindingSet = BS_SERDE.deserialize(valueBytes);
+
+ // Figure out which node needs to handle the new metadata.
+ final String parentNodeId = projectionMetadata.getParentNodeId();
+
+ return new Observation(projectionNodeId, projectionBindingSet, parentNodeId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index fbdca08..e6368ba 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -107,7 +107,7 @@ public class QueryResultObserver extends AbstractObserver {
// Read the Child Binding Set that will be exported.
final Bytes valueBytes = tx.get(brow, col);
final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes);
-
+
// Simplify the result's visibilities.
final String visibility = result.getVisibility();
if(!simplifiedVisibilities.containsKey(visibility)) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
index ff42a0f..eaa072f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -287,7 +287,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
* Builds instances of {@link AggregationMetadata}.
*/
@DefaultAnnotation(NonNull.class)
- public static final class Builder {
+ public static final class Builder implements CommonNodeMetadata.Builder {
private final String nodeId;
private VariableOrder varOrder;
@@ -317,7 +317,7 @@ public class AggregationMetadata extends CommonNodeMetadata {
* single variable because aggregations are only able to emit the aggregated value.
* @return This builder so that method invocations may be chained.
*/
- public Builder setVariableOrder(@Nullable final VariableOrder varOrder) {
+ public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
this.varOrder = varOrder;
return this;
}
@@ -350,6 +350,10 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.childNodeId = childNodeId;
return this;
}
+
+ public String getChildNodeId() {
+ return childNodeId;
+ }
/**
* @param aggregation - An aggregation that will be performed over the BindingSets that are emitted from the child node.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
index e54acf1..a20fe4d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
@@ -99,4 +99,18 @@ public abstract class CommonNodeMetadata {
.append("}")
.toString();
}
+
+ /**
+ * Base interface for all metadata Builders. Using this type def
+ * allows for the implementation of a Builder visitor for navigating
+ * the Builder tree.
+ *
+ */
+ public static interface Builder {
+
+ public String getNodeId();
+
+ public VariableOrder getVariableOrder();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
index e836c5d..6bf968e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
@@ -38,7 +38,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
private String childNodeId;
private ConstructGraph graph;
- private String sparql;
+ private String parentNodeId;
/**
* Creates ConstructQueryMetadata object from the provided metadata arguments.
@@ -47,21 +47,11 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
* @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph
* @param sparql - SPARQL query containing construct graph
*/
- public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) {
- super(nodeId, new VariableOrder("subject", "predicate", "object"));
- Preconditions.checkNotNull(childNodeId);
- Preconditions.checkNotNull(graph);
- Preconditions.checkNotNull(sparql);
- this.childNodeId = childNodeId;
- this.graph = graph;
- this.sparql = sparql;
- }
-
- /**
- * @return sparql query string representing this construct query
- */
- public String getSparql() {
- return sparql;
+ public ConstructQueryMetadata(String nodeId, String parentNodeId, String childNodeId, VariableOrder varOrder, ConstructGraph graph) {
+ super(nodeId, varOrder);
+ this.childNodeId = Preconditions.checkNotNull(childNodeId);
+ this.parentNodeId = Preconditions.checkNotNull(parentNodeId);
+ this.graph = Preconditions.checkNotNull(graph);
}
/**
@@ -71,6 +61,13 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
public String getChildNodeId() {
return childNodeId;
}
+
+ /**
+ * @return The parent of this construct node
+ */
+ public String getParentNodeId() {
+ return parentNodeId;
+ }
/**
* @return The ConstructGraph used to form statement {@link BindingSet}s for
@@ -82,7 +79,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
@Override
public int hashCode() {
- return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql);
+ return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), parentNodeId, childNodeId, graph);
}
@Override
@@ -94,8 +91,8 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
if (o instanceof ConstructQueryMetadata) {
ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o;
if (super.equals(queryMetadata)) {
- return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
- .append(sparql, queryMetadata.sparql).isEquals();
+ return new EqualsBuilder().append(parentNodeId, queryMetadata.parentNodeId).append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
+ .isEquals();
}
return false;
}
@@ -105,7 +102,7 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
@Override
public String toString() {
return new StringBuilder().append("Construct Query Metadata {\n").append(" Node ID: " + super.getNodeId() + "\n")
- .append(" SPARQL QUERY: " + sparql + "\n").append(" Variable Order: " + super.getVariableOrder() + "\n")
+ .append(" Variable Order: " + super.getVariableOrder() + "\n")
.append(" Child Node ID: " + childNodeId + "\n").append(" Construct Graph: " + graph.getProjections() + "\n")
.append("}").toString();
}
@@ -123,13 +120,14 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
* Builds instances of {@link QueryMetadata}.
*/
@DefaultAnnotation(NonNull.class)
- public static final class Builder {
+ public static final class Builder implements CommonNodeMetadata.Builder {
private String nodeId;
private ConstructGraph graph;
+ private String parentNodeId;
private String childNodeId;
- private String sparql;
+ private VariableOrder varOrder;
/**
* Set the node Id that identifies this Construct Query Node
@@ -144,21 +142,31 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
}
/**
- * Set the SPARQL String representing this construct query
- * @param SPARQL string representing this construct query
+ * @return the node id for this construct query
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Sets the VariableOrder that determines how results will be written
+ * @param varOrder
+ * @return This builder so that method invocations may be chained.
*/
- public Builder setSparql(String sparql) {
- this.sparql = sparql;
+ public Builder setVarOrder(VariableOrder varOrder) {
+ this.varOrder = varOrder;
return this;
}
+
+ @Override
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
/**
- * Set the ConstructGraph used to form statement {@link BindingSet}s for
- * this Construct Query
+ * Set the ConstructGraph used to form statement {@link BindingSet}s for this Construct Query
*
- * @param varOrder
- * - ConstructGraph to project {@link BindingSet}s onto RDF
- * statements
+ * @param varOrder - ConstructGraph to project {@link BindingSet}s onto RDF statements
* @return This builder so that method invocations may be chained.
*/
public Builder setConstructGraph(ConstructGraph graph) {
@@ -167,25 +175,37 @@ public class ConstructQueryMetadata extends CommonNodeMetadata {
}
/**
- * Set the node whose results are projected onto the given
- * {@link ConstructGraph}.
+ * Set the node whose results are projected onto the given {@link ConstructGraph}.
*
- * @param childNodeId
- * - The node whose results are projected onto the given
- * {@link ConstructGraph}.
+ * @param childNodeId - The node whose results are projected onto the given {@link ConstructGraph}.
* @return This builder so that method invocations may be chained.
*/
public Builder setChildNodeId(String childNodeId) {
this.childNodeId = childNodeId;
return this;
}
+
+ public String getChildNodeId() {
+ return childNodeId;
+ }
+
+ /**
+ * Set the parent node of this {@link ConstructGraph}.
+ *
+ * @param parentNodeId - The the parent node of this {@link ConstructGraph}.
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setParentNodeId(String parentNodeId) {
+ this.parentNodeId = parentNodeId;
+ return this;
+ }
/**
* @return An instance of {@link ConstructQueryMetadata} build using
* this builder's values.
*/
public ConstructQueryMetadata build() {
- return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql);
+ return new ConstructQueryMetadata(nodeId, parentNodeId, childNodeId, varOrder, graph);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
index 7e2e995..a821d8c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
@@ -145,7 +145,7 @@ public class FilterMetadata extends CommonNodeMetadata {
* Builds instances of {@link FilterMetadata}.
*/
@DefaultAnnotation(NonNull.class)
- public static final class Builder {
+ public static final class Builder implements CommonNodeMetadata.Builder{
private final String nodeId;
private VariableOrder varOrder;
@@ -179,6 +179,11 @@ public class FilterMetadata extends CommonNodeMetadata {
this.varOrder = varOrder;
return this;
}
+
+ @Override
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
/**
* Set the original SPARQL query the filter is derived from.
@@ -212,6 +217,10 @@ public class FilterMetadata extends CommonNodeMetadata {
this.childNodeId = childNodeId;
return this;
}
+
+ public String getChildNodeId() {
+ return childNodeId;
+ }
/**
* @return Returns an instance of {@link FilterMetadata} using this builder's values.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 8d218af..65db02c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
@@ -44,7 +45,8 @@ import net.jcip.annotations.Immutable;
@DefaultAnnotation(NonNull.class)
public class FluoQuery {
- private final Optional<QueryMetadata> queryMetadata;
+ private final QueryMetadata queryMetadata;
+ private final ImmutableMap<String, ProjectionMetadata> projectionMetadata;
private final Optional<ConstructQueryMetadata> constructMetadata;
private final Optional<PeriodicQueryMetadata> periodicQueryMetadata;
private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata;
@@ -52,13 +54,15 @@ public class FluoQuery {
private final ImmutableMap<String, JoinMetadata> joinMetadata;
private final ImmutableMap<String, AggregationMetadata> aggregationMetadata;
private final QueryType type;
- public static enum QueryType {Projection, Construct};
+ private final String queryId;
/**
* Constructs an instance of {@link FluoQuery}. Private because applications
* must use {@link Builder} instead.
*
- * @param queryMetadata - The root node of a query that is updated in Fluo. (not null)
+ * @param queryMetadata - metadata for the query for handling results (not null)
+ * @param projectionMetadata - projection nodes of query that project results (not null)
+ * @param constructMetadata - construct node of query that creates subgraphs
* @param periodicQueryMetadata - The periodic query node that is updated in Fluo.
* @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
* it is represented within the Fluo app. (not null)
@@ -71,52 +75,27 @@ public class FluoQuery {
*/
private FluoQuery(
final QueryMetadata queryMetadata,
+ final ImmutableMap<String, ProjectionMetadata> projectionMetadata,
+ final Optional<ConstructQueryMetadata> constructMetadata,
final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
final ImmutableMap<String, FilterMetadata> filterMetadata,
final ImmutableMap<String, JoinMetadata> joinMetadata,
final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
this.aggregationMetadata = requireNonNull(aggregationMetadata);
- this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
- this.constructMetadata = Optional.absent();
+ this.queryMetadata = requireNonNull(queryMetadata);
+ this.queryId = queryMetadata.getNodeId();
+ this.projectionMetadata = requireNonNull(projectionMetadata);
+ this.constructMetadata = constructMetadata;
this.periodicQueryMetadata = periodicQueryMetadata;
this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
this.filterMetadata = requireNonNull(filterMetadata);
this.joinMetadata = requireNonNull(joinMetadata);
- this.type = QueryType.Projection;
- }
-
-
- /**
- * Constructs an instance of {@link FluoQuery}. Private because applications
- * must use {@link Builder} instead.
- *
- * @param constructMetadata - The root node of a query that is updated in Fluo. (not null)
- * @param periodicQueryMetadata - The periodic query node that is updated in Fluo.
- * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
- * it is represented within the Fluo app. (not null)
- * @param filterMetadata A map from Node ID to Filter metadata as it is represented
- * within the Fluo app. (not null)
- * @param joinMetadata - A map from Node ID to Join metadata as it is represented
- * within the Fluo app. (not null)
- * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is
- * represented within the Fluo app. (not null)
- */
- private FluoQuery(
- final ConstructQueryMetadata constructMetadata,
- final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
- final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
- final ImmutableMap<String, FilterMetadata> filterMetadata,
- final ImmutableMap<String, JoinMetadata> joinMetadata,
- final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
- this.constructMetadata = Optional.of(requireNonNull(constructMetadata));
- this.queryMetadata = Optional.absent();
- this.periodicQueryMetadata = periodicQueryMetadata;
- this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
- this.filterMetadata = requireNonNull(filterMetadata);
- this.joinMetadata = requireNonNull(joinMetadata);
- this.aggregationMetadata = aggregationMetadata;
- this.type = QueryType.Construct;
+ if(constructMetadata.isPresent()) {
+ this.type = QueryType.Construct;
+ } else {
+ this.type = QueryType.Projection;
+ }
}
/**
@@ -126,24 +105,86 @@ public class FluoQuery {
public QueryType getQueryType() {
return type;
}
+
+ /**
+ * @return the unique id of this query
+ */
+ public String getQueryId() {
+ return queryId;
+ }
/**
* @return Metadata about the root node of a query that is updated within the Fluo app.
*/
- public Optional<QueryMetadata> getQueryMetadata() {
+ public QueryMetadata getQueryMetadata() {
return queryMetadata;
}
+ /**
+ * @param nodeId - node id of the query metadata
+ * @return Optional containing the queryMetadata if it matches the specified nodeId
+ */
+ public Optional<QueryMetadata> getQueryMetadata(String nodeId) {
+ if(queryMetadata.getNodeId().equals(nodeId)) {
+ return Optional.of(queryMetadata);
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * @return construct query metadata for generating subgraphs
+ */
public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
return constructMetadata;
}
/**
+ * @param nodeId - node id of the ConstructMetadata
+ * @return Optional containing the ConstructMetadata if it is present and has the given nodeId
+ */
+ public Optional<ConstructQueryMetadata> getConstructQueryMetadata(String nodeId) {
+ if(constructMetadata.isPresent() && constructMetadata.get().getNodeId().equals(nodeId)) {
+ return constructMetadata;
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * @param nodeId - id of the Projection metadata you want (not null)
+ * @return projection metadata corresponding to give nodeId
+ */
+ public Optional<ProjectionMetadata> getProjectionMetadata(String nodeId) {
+ return Optional.fromNullable(projectionMetadata.get(nodeId));
+ }
+
+ /**
+ * @return All of the projection metadata that is stored for the query
+ */
+ public Collection<ProjectionMetadata> getProjectionMetadata() {
+ return projectionMetadata.values();
+ }
+
+ /**
* @return All of the Periodic Query metadata that is stored for the query.
*/
public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() {
return periodicQueryMetadata;
}
+
+ /**
+ * @param nodeId - id of the PeriodicQueryMetadata
+ * @return Optional containing the PeriodicQueryMetadata if it is present and has the given nodeId
+ */
+ public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(String nodeId) {
+
+ if(periodicQueryMetadata.isPresent() && periodicQueryMetadata.get().getNodeId().equals(nodeId)) {
+ return periodicQueryMetadata;
+ } else {
+ return Optional.absent();
+ }
+ }
/**
* Get a Statement Pattern node's metadata.
@@ -254,8 +295,11 @@ public class FluoQuery {
public String toString() {
final StringBuilder builder = new StringBuilder();
- if(queryMetadata.isPresent()) {
- builder.append( queryMetadata.get().toString() );
+ builder.append(queryMetadata.toString());
+ builder.append("\n");
+
+ for(final ProjectionMetadata metadata : projectionMetadata.values()) {
+ builder.append(metadata);
builder.append("\n");
}
@@ -305,9 +349,10 @@ public class FluoQuery {
@DefaultAnnotation(NonNull.class)
public static final class Builder {
- private QueryMetadata.Builder queryBuilder = null;
- private ConstructQueryMetadata.Builder constructBuilder = null;
- private PeriodicQueryMetadata.Builder periodicQueryBuilder = null;
+ private QueryMetadata.Builder queryBuilder;
+ private ConstructQueryMetadata.Builder constructBuilder;
+ private PeriodicQueryMetadata.Builder periodicQueryBuilder;
+ private final Map<String, ProjectionMetadata.Builder> projectionBuilders = new HashMap<>();
private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>();
private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>();
private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>();
@@ -319,23 +364,55 @@ public class FluoQuery {
* @param queryBuilder - The builder representing the query's results.
* @return This builder so that method invocation may be chained.
*/
- public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) {
- this.queryBuilder = queryBuilder;
+ public Builder setQueryMetadata(final QueryMetadata.Builder queryBuilder) {
+ this.queryBuilder = requireNonNull(queryBuilder);
return this;
}
/**
* @return The Query metadata builder if one has been set.
*/
- public Optional<QueryMetadata.Builder> getQueryBuilder() {
- return Optional.fromNullable( queryBuilder );
+ public QueryMetadata.Builder getQueryBuilder() {
+ return queryBuilder;
+ }
+
+ /**
+ * @param nodeId - id of the QueryMetadata.Builder
+ * @return Optional containing the QueryMetadata.Builder if it has the specified nodeId
+ */
+ public Optional<QueryMetadata.Builder> getQueryBuilder(String nodeId) {
+ if(queryBuilder.getNodeId().equals(nodeId)) {
+ return Optional.of(queryBuilder);
+ } else {
+ return Optional.absent();
+ }
+
+ }
+
+ /**
+ * Sets the {@link ProjectionMetadata.Builder} that is used by this builder.
+ *
+ * @param projectionBuilder - The builder representing this query's projection
+ * @return This builder so that method invocation may be chained.
+ */
+ public Builder addProjectionBuilder(@Nullable final ProjectionMetadata.Builder projectionBuilder) {
+ requireNonNull(projectionBuilder);
+ projectionBuilders.put(projectionBuilder.getNodeId(), projectionBuilder);
+ return this;
+ }
+
+ /**
+ * @return The ProjectionMetadata builder if one has been set.
+ */
+ public Optional<ProjectionMetadata.Builder> getProjectionBuilder(String nodeId) {
+ requireNonNull(nodeId);
+ return Optional.fromNullable( projectionBuilders.get(nodeId) );
}
/**
* Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder.
*
- * @param constructBuilder
- * - The builder representing the query's results.
+ * @param constructBuilder - The builder representing the query's results.
* @return This builder so that method invocation may be chained.
*/
public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) {
@@ -344,6 +421,18 @@ public class FluoQuery {
}
/**
+ * @param id of the ConstructQueryMetadata.Builder
+ * @return Optional containing the ConstructQueryMetadata.Builder if it has been set and has the given nodeId.
+ */
+ public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder(String nodeId) {
+ if(constructBuilder != null && constructBuilder.getNodeId().equals(nodeId)) {
+ return Optional.of(constructBuilder);
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
* @return The Construct Query metadata builder if one has been set.
*/
public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() {
@@ -442,8 +531,6 @@ public class FluoQuery {
this.aggregationBuilders.put(aggregationBuilder.getNodeId(), aggregationBuilder);
return this;
}
-
-
/**
* Adds a new {@link PeriodicQueryMetadata.Builder} to this builder.
@@ -457,7 +544,6 @@ public class FluoQuery {
return this;
}
-
/**
* Get a PeriodicQuery builder from this builder.
*
@@ -467,24 +553,35 @@ public class FluoQuery {
return Optional.fromNullable( periodicQueryBuilder);
}
-
/**
- * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
+ * @param - id of the PeriodicQueryMetadata.Builder
+ * @return - Optional containing the PeriodicQueryMetadata.Builder if one has been set and it has the given nodeId
*/
- public FluoQuery build() {
- checkArgument((queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
+ public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder(String nodeId) {
- Optional<QueryMetadata.Builder> optionalQueryBuilder = getQueryBuilder();
- QueryMetadata queryMetadata = null;
- if(optionalQueryBuilder.isPresent()) {
- queryMetadata = optionalQueryBuilder.get().build();
+ if(periodicQueryBuilder != null && periodicQueryBuilder.getNodeId().equals(nodeId)) {
+ return Optional.of(periodicQueryBuilder);
+ } else {
+ return Optional.absent();
}
+ }
+
+ /**
+ * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
+ */
+ public FluoQuery build() {
+ checkArgument((projectionBuilders.size() > 0 || constructBuilder != null));
Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
PeriodicQueryMetadata periodicQueryMetadata = null;
if(optionalPeriodicQueryBuilder.isPresent()) {
periodicQueryMetadata = optionalPeriodicQueryBuilder.get().build();
}
+
+ final ImmutableMap.Builder<String, ProjectionMetadata> projectionMetadata = ImmutableMap.builder();
+ for(final Entry<String, ProjectionMetadata.Builder> entry : projectionBuilders.entrySet()) {
+ projectionMetadata.put(entry.getKey(), entry.getValue().build());
+ }
final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder();
for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) {
@@ -506,12 +603,13 @@ public class FluoQuery {
aggregateMetadata.put(entry.getKey(), entry.getValue().build());
}
- if(queryBuilder != null) {
- return new FluoQuery(queryBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
- }
- //constructBuilder non-null in this case, but no need to check
- else {
- return new FluoQuery(constructBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ if(constructBuilder != null) {
+ if(periodicQueryMetadata != null) {
+ throw new IllegalArgumentException("Queries containing sliding window filters and construct query patterns are not supported.");
+ }
+ return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ } else {
+ return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index ed18d49..8cd25d0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -42,6 +42,20 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
* <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
* <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ * <tr> <td>Node ID</td> <td>queryMetadata:queryType</td> <td>The {@link QueryType} of this query.</td> </tr>
+ * <tr> <td>Node ID</td> <td>queryMetadata:exportStrategies</td> <td>Strategies for exporting results from Rya Fluo app</td> </tr>
+ * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
+ * </table>
+ * </p>
+ * <p>
+ * <b>Projection Metadata</b>
+ * <table border="1" style="width:100%">
+ * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ * <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
+ * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*
+ * <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr>
+ * <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
+ * <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr>
* <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
* </table>
* </p>
@@ -50,11 +64,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <table border="1" style="width:100%">
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
* <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
- * <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr>
* <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
* <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr>
* <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
- * <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
+ * <tr> <td>Node ID</td> <td>constructMetadata:parentNodeId</td> <td>The Node ID of the parent that this node feeds.</td> </tr>
+ * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr>
* </table>
* </p>
* <p>
@@ -131,6 +145,7 @@ public class FluoQueryColumns {
public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata";
public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
+ public static final String PROJECTION_METADATA_CF = "projectionMetadata";
public static final String PERIODIC_QUERY_METADATA_CF = "periodicQueryMetadata";
/**
@@ -178,14 +193,24 @@ public class FluoQueryColumns {
public static final Column QUERY_SPARQL = new Column(QUERY_METADATA_CF, "sparql");
public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId");
public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
+ public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies");
+ public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType");
+
+ // Query Metadata columns.
+ public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId");
+ public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars");
+ public static final Column PROJECTION_VARIABLE_ORDER = new Column(PROJECTION_METADATA_CF, "variableOrder");
+ public static final Column PROJECTION_CHILD_NODE_ID = new Column(PROJECTION_METADATA_CF, "childNodeId");
+ public static final Column PROJECTION_PARENT_NODE_ID = new Column(PROJECTION_METADATA_CF, "parentNodeId");
+ public static final Column PROJECTION_BINDING_SET = new Column(PROJECTION_METADATA_CF, "bindingSet");
// Construct Query Metadata columns.
public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId");
public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder");
public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph");
public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId");
+ public static final Column CONSTRUCT_PARENT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "parentNodeId");
public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements");
- public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql");
// Filter Metadata columns.
public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId");
@@ -256,8 +281,20 @@ public class FluoQueryColumns {
Arrays.asList(QUERY_NODE_ID,
QUERY_VARIABLE_ORDER,
QUERY_SPARQL,
+ QUERY_TYPE,
+ QUERY_EXPORT_STRATEGIES,
QUERY_CHILD_NODE_ID)),
+ /**
+ * The columns a {@link ProjectionMetadata} object's fields are stored within.
+ */
+ PROJECTION_COLUMNS(
+ Arrays.asList(PROJECTION_NODE_ID,
+ PROJECTION_PROJECTED_VARS,
+ PROJECTION_VARIABLE_ORDER,
+ PROJECTION_PARENT_NODE_ID,
+ PROJECTION_CHILD_NODE_ID)),
+
/**
* The columns a {@link PeriodicBinMetadata} object's fields are stored within.
@@ -280,7 +317,7 @@ public class FluoQueryColumns {
CONSTRUCT_VARIABLE_ORDER,
CONSTRUCT_GRAPH,
CONSTRUCT_CHILD_NODE_ID,
- CONSTRUCT_SPARQL,
+ CONSTRUCT_PARENT_NODE_ID,
CONSTRUCT_STATEMENTS)),
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 8675b80..5ba7383 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -25,7 +25,9 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.fluo.api.client.SnapshotBase;
@@ -34,6 +36,9 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -42,7 +47,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -64,10 +68,14 @@ public class FluoQueryMetadataDAO {
requireNonNull(tx);
requireNonNull(metadata);
+ Joiner joiner = Joiner.on(IncrementalUpdateConstants.VAR_DELIM);
+
final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId);
tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString());
tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, metadata.getSparql() );
+ tx.set(rowId, FluoQueryColumns.QUERY_EXPORT_STRATEGIES, joiner.join(metadata.getExportStrategies()));
+ tx.set(rowId, FluoQueryColumns.QUERY_TYPE, metadata.getQueryType().toString());
tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, metadata.getChildNodeId() );
}
@@ -91,6 +99,8 @@ public class FluoQueryMetadataDAO {
final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.QUERY_VARIABLE_ORDER,
FluoQueryColumns.QUERY_SPARQL,
+ FluoQueryColumns.QUERY_TYPE,
+ FluoQueryColumns.QUERY_EXPORT_STRATEGIES,
FluoQueryColumns.QUERY_CHILD_NODE_ID);
// Return an object holding them.
@@ -99,13 +109,81 @@ public class FluoQueryMetadataDAO {
final String sparql = values.get(FluoQueryColumns.QUERY_SPARQL);
final String childNodeId = values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID);
+ final String queryType = values.get(FluoQueryColumns.QUERY_TYPE);
+ final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM);
+
+ Set<ExportStrategy> strategies = new HashSet<>();
+ for(String strategy: exportStrategies) {
+ strategies.add(ExportStrategy.valueOf(strategy));
+ }
return QueryMetadata.builder(nodeId)
- .setVariableOrder( varOrder )
+ .setVarOrder( varOrder )
.setSparql( sparql )
+ .setExportStrategies(strategies)
+ .setQueryType(QueryType.valueOf(queryType))
.setChildNodeId( childNodeId );
}
+
+
+ /**
+ * Write an instance of {@link ProjectionMetadata} to the Fluo table.
+ *
+ * @param tx - The transaction that will be used to commit the metadata. (not null)
+ * @param metadata - The Query node metadata that will be written to the table. (not null)
+ */
+ public void write(final TransactionBase tx, final ProjectionMetadata metadata) {
+ requireNonNull(tx);
+ requireNonNull(metadata);
+
+ final String rowId = metadata.getNodeId();
+ tx.set(rowId, FluoQueryColumns.PROJECTION_NODE_ID, rowId);
+ tx.set(rowId, FluoQueryColumns.PROJECTION_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.PROJECTION_PROJECTED_VARS, metadata.getProjectedVars().toString());
+ tx.set(rowId, FluoQueryColumns.PROJECTION_PARENT_NODE_ID, metadata.getParentNodeId());
+ tx.set(rowId, FluoQueryColumns.PROJECTION_CHILD_NODE_ID, metadata.getChildNodeId() );
+ }
+
+ /**
+ * Read an instance of {@link ProjectionMetadata} from the Fluo table.
+ *
+ * @param sx - The snapshot that will be used to read the metadata . (not null)
+ * @param nodeId - The nodeId of the Projection node that will be read. (not null)
+ * @return The {@link ProjectionMetadata} that was read from the table.
+ */
+ public ProjectionMetadata readProjectionMetadata(final SnapshotBase sx, final String nodeId) {
+ return readProjectionMetadataBuilder(sx, nodeId).build();
+ }
+
+ private ProjectionMetadata.Builder readProjectionMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+ requireNonNull(sx);
+ requireNonNull(nodeId);
+
+ // Fetch the values from the Fluo table.
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
+ FluoQueryColumns.PROJECTION_VARIABLE_ORDER,
+ FluoQueryColumns.PROJECTION_PROJECTED_VARS,
+ FluoQueryColumns.PROJECTION_PARENT_NODE_ID,
+ FluoQueryColumns.PROJECTION_CHILD_NODE_ID);
+ // Return an object holding them.
+ final String varOrderString = values.get(FluoQueryColumns.PROJECTION_VARIABLE_ORDER);
+ final String projectedVarString = values.get(FluoQueryColumns.PROJECTION_PROJECTED_VARS);
+ final VariableOrder varOrder = new VariableOrder(varOrderString);
+ final VariableOrder projectedVars = new VariableOrder(projectedVarString);
+ final String childNodeId = values.get(FluoQueryColumns.PROJECTION_CHILD_NODE_ID);
+ final String parentNodeId = values.get(FluoQueryColumns.PROJECTION_PARENT_NODE_ID);
+
+
+ return ProjectionMetadata.builder(nodeId)
+ .setVarOrder( varOrder )
+ .setProjectedVars(projectedVars)
+ .setParentNodeId(parentNodeId)
+ .setChildNodeId( childNodeId );
+ }
+
+
/**
* Write an instance of {@link ConstructQueryMetadata} to the Fluo table.
*
@@ -120,7 +198,7 @@ public class FluoQueryMetadataDAO {
tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId);
tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString());
tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() );
- tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql());
+ tx.set(rowId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID, metadata.getParentNodeId() );
tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph()));
}
@@ -143,18 +221,22 @@ public class FluoQueryMetadataDAO {
final String rowId = nodeId;
final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.CONSTRUCT_GRAPH,
- FluoQueryColumns.CONSTRUCT_SPARQL,
- FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
+ FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID,
+ FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID,
+ FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER);
final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH);
final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString);
final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID);
- final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL);
+ final String parentNodeId = values.get(FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID);
+ final String varOrderString = values.get(FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER);
+
return ConstructQueryMetadata.builder()
.setNodeId(nodeId)
+ .setParentNodeId(parentNodeId)
.setConstructGraph(graph)
- .setSparql(sparql)
+ .setVarOrder(new VariableOrder(varOrderString))
.setChildNodeId(childNodeId);
}
@@ -342,7 +424,7 @@ public class FluoQueryMetadataDAO {
final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
return JoinMetadata.builder(nodeId)
- .setVariableOrder(varOrder)
+ .setVarOrder(varOrder)
.setJoinType(joinType)
.setParentNodeId(parentNodeId)
.setLeftChildNodeId(leftChildNodeId)
@@ -477,7 +559,7 @@ public class FluoQueryMetadataDAO {
}
final AggregationMetadata.Builder builder = AggregationMetadata.builder(nodeId)
- .setVariableOrder(varOrder)
+ .setVarOrder(varOrder)
.setParentNodeId(parentNodeId)
.setChildNodeId(childNodeId)
.setGroupByVariableOrder(groupByVars);
@@ -498,27 +580,28 @@ public class FluoQueryMetadataDAO {
public void write(final TransactionBase tx, final FluoQuery query) {
requireNonNull(tx);
requireNonNull(query);
+
+ QueryMetadata queryMetadata = query.getQueryMetadata();
+ final String sparql = queryMetadata.getSparql();
+ final String queryId = queryMetadata.getNodeId();
+ final String pcjId = queryMetadata.getExportId();
+
+ // The results of the query are eventually exported to an instance
+ // of Rya, so store the Rya ID for the PCJ.
+ tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+ tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
+ tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
+ write(tx, queryMetadata);
// Write the rest of the metadata objects.
- switch (query.getQueryType()) {
- case Construct:
+
+ if (query.getQueryType() == QueryType.Construct) {
ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
- // Store the Query ID so that it may be looked up from the original
- // SPARQL string.
- final String constructSparql = constructMetadata.getSparql();
- final String constructQueryId = constructMetadata.getNodeId();
- tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId));
write(tx, constructMetadata);
- break;
- case Projection:
- QueryMetadata queryMetadata = query.getQueryMetadata().get();
- // Store the Query ID so that it may be looked up from the original
- // SPARQL string.
- final String sparql = queryMetadata.getSparql();
- final String queryId = queryMetadata.getNodeId();
- tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
- write(tx, queryMetadata);
- break;
+ }
+
+ for(final ProjectionMetadata projection : query.getProjectionMetadata()) {
+ write(tx, projection);
}
Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata();
@@ -569,16 +652,23 @@ public class FluoQueryMetadataDAO {
case QUERY:
// Add this node's metadata.
final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId);
- Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
builder.setQueryMetadata(queryBuilder);
// Add it's child's metadata.
addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId());
break;
-
+
+ case PROJECTION:
+ //Add this node's metadata
+ final ProjectionMetadata.Builder projectionBuilder = readProjectionMetadataBuilder(sx, childNodeId);
+ builder.addProjectionBuilder(projectionBuilder);
+
+ //Add it's child's metadata
+ addChildMetadata(sx, builder, projectionBuilder.build().getChildNodeId());
+ break;
+
case CONSTRUCT:
final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId);
- Preconditions.checkArgument(!builder.getQueryBuilder().isPresent());
builder.setConstructQueryMetadata(constructBuilder);
// Add it's child's metadata.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
index 7bad9a7..aa79daf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
@@ -163,7 +163,7 @@ public class JoinMetadata extends CommonNodeMetadata {
* Builds instances of {@link JoinMetadata}.
*/
@DefaultAnnotation(NonNull.class)
- public static final class Builder {
+ public static final class Builder implements CommonNodeMetadata.Builder {
private final String nodeId;
private VariableOrder varOrder;
@@ -194,11 +194,16 @@ public class JoinMetadata extends CommonNodeMetadata {
* @param varOrder - The variable order of the binding sets that are emitted by this node.
* @return This builder so that method invocation could be chained.
*/
- public Builder setVariableOrder(@Nullable final VariableOrder varOrder) {
+ public Builder setVarOrder(@Nullable final VariableOrder varOrder) {
this.varOrder = varOrder;
return this;
}
+ @Override
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
+
/**
* Sets the node id of this node's parent.
*
@@ -242,6 +247,14 @@ public class JoinMetadata extends CommonNodeMetadata {
this.rightChildNodeId = rightChildNodeId;
return this;
}
+
+ public String getLeftChildNodeId() {
+ return leftChildNodeId;
+ }
+
+ public String getRightChildNodeId() {
+ return rightChildNodeId;
+ }
/**
* @return An instance of {@link JoinMetadata} built using this builder's values.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
index 33253f2..ae4b10e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
@@ -166,7 +166,7 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata {
/**
* Builder for chaining method calls to construct an instance of PeriodicQueryMetadata.
*/
- public static class Builder {
+ public static class Builder implements CommonNodeMetadata.Builder {
private String nodeId;
private VariableOrder varOrder;
@@ -200,11 +200,12 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata {
return this;
}
+
/**
* Returns {@link VariableOrder}
* @return VariableOrder that indicates order that results are written in
*/
- public VariableOrder getVarOrder() {
+ public VariableOrder getVariableOrder() {
return varOrder;
}
@@ -235,6 +236,10 @@ public class PeriodicQueryMetadata extends CommonNodeMetadata {
return this;
}
+ public String getChildNodeId() {
+ return childNodeId;
+ }
+
/**
* Sets window size for periodic query
* @param windowSize