You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:59 UTC
[6/9] incubator-rya git commit: RYA-280-Periodic Query Service.
Closes #177.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index a701052..8d218af 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -18,6 +18,7 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.query;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import java.util.Collection;
@@ -29,7 +30,6 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -46,6 +46,7 @@ public class FluoQuery {
private final Optional<QueryMetadata> queryMetadata;
private final Optional<ConstructQueryMetadata> constructMetadata;
+ private final Optional<PeriodicQueryMetadata> periodicQueryMetadata;
private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata;
private final ImmutableMap<String, FilterMetadata> filterMetadata;
private final ImmutableMap<String, JoinMetadata> joinMetadata;
@@ -58,6 +59,7 @@ public class FluoQuery {
* must use {@link Builder} instead.
*
* @param queryMetadata - The root node of a query that is updated in Fluo. (not null)
+ * @param periodicQueryMetadata - The periodic query node that is updated in Fluo.
* @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
* it is represented within the Fluo app. (not null)
* @param filterMetadata - A map from Node ID to Filter metadata as it is represented
@@ -69,6 +71,7 @@ public class FluoQuery {
*/
private FluoQuery(
final QueryMetadata queryMetadata,
+ final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
final ImmutableMap<String, FilterMetadata> filterMetadata,
final ImmutableMap<String, JoinMetadata> joinMetadata,
@@ -76,6 +79,7 @@ public class FluoQuery {
this.aggregationMetadata = requireNonNull(aggregationMetadata);
this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
this.constructMetadata = Optional.absent();
+ this.periodicQueryMetadata = periodicQueryMetadata;
this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
this.filterMetadata = requireNonNull(filterMetadata);
this.joinMetadata = requireNonNull(joinMetadata);
@@ -88,23 +92,26 @@ public class FluoQuery {
* must use {@link Builder} instead.
*
* @param constructMetadata - The root node of a query that is updated in Fluo. (not null)
+ * @param periodicQueryMetadata - The periodic query node that is updated in Fluo.
* @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
* it is represented within the Fluo app. (not null)
* @param filterMetadata A map from Node ID to Filter metadata as it is represented
* within the Fluo app. (not null)
- * @param joinMetadata A map from Node ID to Join metadata as it is represented
+ * @param joinMetadata - A map from Node ID to Join metadata as it is represented
* within the Fluo app. (not null)
* @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is
* represented within the Fluo app. (not null)
*/
private FluoQuery(
final ConstructQueryMetadata constructMetadata,
+ final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
final ImmutableMap<String, FilterMetadata> filterMetadata,
final ImmutableMap<String, JoinMetadata> joinMetadata,
final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
this.constructMetadata = Optional.of(requireNonNull(constructMetadata));
this.queryMetadata = Optional.absent();
+ this.periodicQueryMetadata = periodicQueryMetadata;
this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
this.filterMetadata = requireNonNull(filterMetadata);
this.joinMetadata = requireNonNull(joinMetadata);
@@ -130,6 +137,13 @@ public class FluoQuery {
public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
return constructMetadata;
}
+
+ /**
+ * @return All of the Periodic Query metadata that is stored for the query.
+ */
+ public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() {
+ return periodicQueryMetadata;
+ }
/**
* Get a Statement Pattern node's metadata.
@@ -207,6 +221,7 @@ public class FluoQuery {
public int hashCode() {
return Objects.hashCode(
queryMetadata,
+ periodicQueryMetadata,
statementPatternMetadata,
filterMetadata,
joinMetadata,
@@ -224,6 +239,7 @@ public class FluoQuery {
return new EqualsBuilder()
.append(queryMetadata, fluoQuery.queryMetadata)
.append(constructMetadata, fluoQuery.constructMetadata)
+ .append(periodicQueryMetadata, fluoQuery.periodicQueryMetadata)
.append(statementPatternMetadata, fluoQuery.statementPatternMetadata)
.append(filterMetadata, fluoQuery.filterMetadata)
.append(joinMetadata, fluoQuery.joinMetadata)
@@ -247,6 +263,11 @@ public class FluoQuery {
builder.append( constructMetadata.get().toString() );
builder.append("\n");
}
+
+ if(periodicQueryMetadata.isPresent()) {
+ builder.append(periodicQueryMetadata.get());
+ builder.append("\n");
+ }
for(final FilterMetadata metadata : filterMetadata.values()) {
builder.append(metadata);
@@ -286,6 +307,7 @@ public class FluoQuery {
private QueryMetadata.Builder queryBuilder = null;
private ConstructQueryMetadata.Builder constructBuilder = null;
+ private PeriodicQueryMetadata.Builder periodicQueryBuilder = null;
private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>();
private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>();
private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>();
@@ -388,6 +410,17 @@ public class FluoQuery {
}
/**
+ * Get a Join builder from this builder.
+ *
+ * @param nodeId - The Node ID the Join builder was stored at. (not null)
+ * @return The builder that was stored at the node id if one was found.
+ */
+ public Optional<JoinMetadata.Builder> getJoinBuilder(final String nodeId) {
+ requireNonNull(nodeId);
+ return Optional.fromNullable( joinBuilders.get(nodeId) );
+ }
+
+ /**
* Get an Aggregate builder from this builder.
*
* @param nodeId - The Node ID the Aggregate builder was stored at. (not null)
@@ -410,15 +443,28 @@ public class FluoQuery {
return this;
}
+
+
/**
- * Get a Join builder from this builder.
+ * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder.
*
- * @param nodeId - The Node ID the Join builder was stored at. (not null)
- * @return The builder that was stored at the node id if one was found.
+ * @param periodicQueryBuilder - A builder representing a specific Join within the query. (not null)
+ * @return This builder so that method invocation may be chained.
*/
- public Optional<JoinMetadata.Builder> getJoinBuilder(final String nodeId) {
- requireNonNull(nodeId);
- return Optional.fromNullable( joinBuilders.get(nodeId) );
+ public Builder addPeriodicQueryMetadata(final PeriodicQueryMetadata.Builder periodicQueryBuilder) {
+ requireNonNull(periodicQueryBuilder);
+ this.periodicQueryBuilder = periodicQueryBuilder;
+ return this;
+ }
+
+
+ /**
+ * Get a PeriodicQuery builder from this builder.
+ *
+ * @return The PeriodicQuery builder if one has been set.
+ */
+ public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder() {
+ return Optional.fromNullable( periodicQueryBuilder);
}
@@ -426,8 +472,19 @@ public class FluoQuery {
* @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
*/
public FluoQuery build() {
- Preconditions.checkArgument(
- (queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
+ checkArgument((queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
+
+ Optional<QueryMetadata.Builder> optionalQueryBuilder = getQueryBuilder();
+ QueryMetadata queryMetadata = null;
+ if(optionalQueryBuilder.isPresent()) {
+ queryMetadata = optionalQueryBuilder.get().build();
+ }
+
+ Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
+ PeriodicQueryMetadata periodicQueryMetadata = null;
+ if(optionalPeriodicQueryBuilder.isPresent()) {
+ periodicQueryMetadata = optionalPeriodicQueryBuilder.get().build();
+ }
final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder();
for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) {
@@ -450,11 +507,11 @@ public class FluoQuery {
}
if(queryBuilder != null) {
- return new FluoQuery(queryBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ return new FluoQuery(queryBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
}
//constructBuilder non-null in this case, but no need to check
else {
- return new FluoQuery(constructBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ return new FluoQuery(constructBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 3396114..ed18d49 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -63,14 +63,28 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
* <tr> <td>Node ID</td> <td>filterMetadata:nodeId</td> <td>The Node ID of the Filter.</td> </tr>
* <tr> <td>Node ID</td> <td>filterMetadata:veriableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
- * <tr> <td>Node ID</td> <td>filterMetadata:originalSparql</td> <td>The original SPRAQL query this filter was derived from.</td> </tr>
- * <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> <td>Indicates which filter within the original SPARQL query this represents.</td> </tr>
+ * <tr> <td>Node ID</td> <td>filterMetadata:filterSparql</td> <td>A SPARQL query representing this filter.</td> </tr>
* <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node ID this filter emits Binding Sets to.</td> </tr>
* <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node ID of the node that feeds this node Binding Sets.</td> </tr>
* <tr> <td>Node ID + DELIM + Binding Set String</td> <td>filterMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
* </table>
* </p>
* <p>
+ * <b>Periodic Bin Metadata</b>
+ * <table border="1" style="width:100%">
+ * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:nodeId</td> <td>The Node ID of the Filter.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:period</td> <td>The period size used to form BindingSet bins.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:windowSize</td> <td>The window size used to form BindingSet bins.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:timeUnit</td> <td>The unit of time corresponding to period and window size.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:temporalVariable</td> <td>The BindingSet variable corresponding to event time.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:parentNodeId</td> <td>The parent node for this node.</td> </tr>
+ * <tr> <td>Node ID</td> <td>periodicQueryMetadata:childNodeId</td> <td>The child node for this node.</td> </tr>
+ * <tr> <td>Node ID + DELIM + Binding set String</td> <td>periodicQueryMetadata:bindingSet</td> <td>A binned BindingSet.</td> </tr>
+ * </table>
+ * </p>
+ * <p>
* <b>Join Metadata</b>
* <table border="1" style="width:100%">
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
@@ -117,6 +131,7 @@ public class FluoQueryColumns {
public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata";
public static final String AGGREGATION_METADATA_CF = "aggregationMetadata";
public static final String CONSTRUCT_METADATA_CF = "constructMetadata";
+ public static final String PERIODIC_QUERY_METADATA_CF = "periodicQueryMetadata";
/**
* New triples that have been added to Rya are written as a row in this
@@ -174,13 +189,23 @@ public class FluoQueryColumns {
// Filter Metadata columns.
public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId");
- public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder");
- public static final Column FILTER_ORIGINAL_SPARQL = new Column(FILTER_METADATA_CF, "originalSparql");
- public static final Column FILTER_INDEX_WITHIN_SPARQL = new Column(FILTER_METADATA_CF, "filterIndexWithinSparql");
+ public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "variableOrder");
+ public static final Column FILTER_SPARQL = new Column(FILTER_METADATA_CF, "filterSparql");
public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId");
public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId");
public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet");
-
+
+ // Periodic Bin Metadata columns.
+ public static final Column PERIODIC_QUERY_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "nodeId");
+ public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new Column(PERIODIC_QUERY_METADATA_CF, "variableOrder");
+ public static final Column PERIODIC_QUERY_PARENT_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "parentNodeId");
+ public static final Column PERIODIC_QUERY_CHILD_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "childNodeId");
+ public static final Column PERIODIC_QUERY_BINDING_SET = new Column(PERIODIC_QUERY_METADATA_CF, "bindingSet");
+ public static final Column PERIODIC_QUERY_PERIOD = new Column(PERIODIC_QUERY_METADATA_CF, "period");
+ public static final Column PERIODIC_QUERY_WINDOWSIZE = new Column(PERIODIC_QUERY_METADATA_CF, "windowSize");
+ public static final Column PERIODIC_QUERY_TIMEUNIT = new Column(PERIODIC_QUERY_METADATA_CF, "timeUnit");
+ public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable");
+
// Join Metadata columns.
public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId");
public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder");
@@ -207,6 +232,18 @@ public class FluoQueryColumns {
public static final Column AGGREGATION_BINDING_SET = new Column(AGGREGATION_METADATA_CF, "bindingSet");
/**
+ * BatchObserver column for processing tasks that need to be broken into
+ * batches. Entries stored stored in this column are of the form Row:
+ * nodeId, Value: BatchInformation. The nodeId indicates the node that the
+ * batch operation will be performed on. All batch operations are performed
+ * on the bindingSet column for the NodeType indicated by the given nodeId.
+ * For example, if the nodeId indicated that the NodeType was
+ * StatementPattern, then the batch operation would be performed on
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_BINDING_SET}.
+ */
+ public static final Column BATCH_COLUMN = new Column("batch","information");
+
+ /**
* Enumerates the {@link Column}s that hold all of the fields for each type
* of node that can compose a query.
*/
@@ -220,6 +257,20 @@ public class FluoQueryColumns {
QUERY_VARIABLE_ORDER,
QUERY_SPARQL,
QUERY_CHILD_NODE_ID)),
+
+
+ /**
+ * The columns a {@link PeriodicBinMetadata} object's fields are stored within.
+ */
+ PERIODIC_QUERY_COLUMNS(
+ Arrays.asList(PERIODIC_QUERY_NODE_ID,
+ PERIODIC_QUERY_VARIABLE_ORDER,
+ PERIODIC_QUERY_PERIOD,
+ PERIODIC_QUERY_WINDOWSIZE,
+ PERIODIC_QUERY_TIMEUNIT,
+ PERIODIC_QUERY_TEMPORAL_VARIABLE,
+ PERIODIC_QUERY_PARENT_NODE_ID,
+ PERIODIC_QUERY_CHILD_NODE_ID)),
/**
* The columns a {@link ConstructQueryMetadata} object's fields are stored within.
@@ -239,8 +290,7 @@ public class FluoQueryColumns {
FILTER_COLUMNS(
Arrays.asList(FILTER_NODE_ID,
FILTER_VARIABLE_ORDER,
- FILTER_ORIGINAL_SPARQL,
- FILTER_INDEX_WITHIN_SPARQL,
+ FILTER_SPARQL,
FILTER_PARENT_NODE_ID,
FILTER_CHILD_NODE_ID)),
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 5e9d654..8675b80 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -26,6 +26,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
@@ -40,6 +41,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -170,8 +172,7 @@ public class FluoQueryMetadataDAO {
final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId);
tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, metadata.getVariableOrder().toString());
- tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, metadata.getOriginalSparql() );
- tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, metadata.getFilterIndexWithinSparql()+"" );
+ tx.set(rowId, FluoQueryColumns.FILTER_SPARQL, metadata.getFilterSparql() );
tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, metadata.getParentNodeId() );
tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, metadata.getChildNodeId() );
}
@@ -195,8 +196,7 @@ public class FluoQueryMetadataDAO {
final String rowId = nodeId;
final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.FILTER_VARIABLE_ORDER,
- FluoQueryColumns.FILTER_ORIGINAL_SPARQL,
- FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL,
+ FluoQueryColumns.FILTER_SPARQL,
FluoQueryColumns.FILTER_PARENT_NODE_ID,
FluoQueryColumns.FILTER_CHILD_NODE_ID);
@@ -204,18 +204,88 @@ public class FluoQueryMetadataDAO {
final String varOrderString = values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER);
final VariableOrder varOrder = new VariableOrder(varOrderString);
- final String originalSparql = values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL);
- final int filterIndexWithinSparql = Integer.parseInt(values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL));
+ final String originalSparql = values.get(FluoQueryColumns.FILTER_SPARQL);
final String parentNodeId = values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID);
final String childNodeId = values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID);
- return FilterMetadata.builder(nodeId)
+ return FilterMetadata.builder(nodeId).setVarOrder(varOrder).setFilterSparql(originalSparql)
+ .setParentNodeId(parentNodeId).setChildNodeId(childNodeId);
+ }
+
+ /**
+ * Write an instance of {@link PeriodicQueryMetadata} to the Fluo table.
+ *
+ * @param tx
+ * - The transaction that will be used to commit the metadata.
+ * (not null)
+ * @param metadata
+ * - The PeriodicBin node metadata that will be written to the
+ * table. (not null)
+ */
+ public void write(final TransactionBase tx, final PeriodicQueryMetadata metadata) {
+ requireNonNull(tx);
+ requireNonNull(metadata);
+
+ final String rowId = metadata.getNodeId();
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_NODE_ID, rowId);
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID, metadata.getParentNodeId());
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID, metadata.getChildNodeId());
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_PERIOD, Long.toString(metadata.getPeriod()));
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE, Long.toString(metadata.getWindowSize()));
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT, metadata.getUnit().name());
+ tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE, metadata.getTemporalVariable());
+ }
+
+ /**
+ * Read an instance of {@link PeriodicQueryMetadata} from the Fluo table.
+ *
+ * @param sx
+ * - The snapshot that will be used to read the metadata. (not
+ * null)
+ * @param nodeId
+ * - The nodeId of the PeriodicBin node that will be read. (not
+ * null)
+ * @return The {@link PeriodicQueryMetadata} that was read from table.
+ */
+ public PeriodicQueryMetadata readPeriodicQueryMetadata(final SnapshotBase sx, final String nodeId) {
+ return readPeriodicQueryMetadataBuilder(sx, nodeId).build();
+ }
+
+ private PeriodicQueryMetadata.Builder readPeriodicQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+ requireNonNull(sx);
+ requireNonNull(nodeId);
+
+ // Fetch the values from the Fluo table.
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER,
+ FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID, FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID,
+ FluoQueryColumns.PERIODIC_QUERY_PERIOD, FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE,
+ FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT, FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE);
+
+ // Return an object holding them.
+ final String varOrderString = values.get(FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER);
+ final VariableOrder varOrder = new VariableOrder(varOrderString);
+ final String parentNodeId = values.get(FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID);
+ final String childNodeId = values.get(FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID);
+ final String temporalVariable = values.get(FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE);
+ final String period = values.get(FluoQueryColumns.PERIODIC_QUERY_PERIOD);
+ final String window = values.get(FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE);
+ final String timeUnit = values.get(FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT);
+
+ return PeriodicQueryMetadata.builder()
+ .setNodeId(nodeId)
.setVarOrder(varOrder)
- .setOriginalSparql(originalSparql)
- .setFilterIndexWithinSparql(filterIndexWithinSparql)
.setParentNodeId(parentNodeId)
- .setChildNodeId(childNodeId);
+ .setChildNodeId(childNodeId)
+ .setWindowSize(Long.parseLong(window))
+ .setPeriod(Long.parseLong(period))
+ .setTemporalVariable(temporalVariable)
+ .setUnit(TimeUnit.valueOf(timeUnit));
+
}
+
+
/**
* Write an instance of {@link JoinMetadata} to the Fluo table.
@@ -325,12 +395,10 @@ public class FluoQueryMetadataDAO {
final String pattern = values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN);
final String parentNodeId = values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID);
- return StatementPatternMetadata.builder(nodeId)
- .setVarOrder(varOrder)
- .setStatementPattern(pattern)
- .setParentNodeId(parentNodeId);
+ return StatementPatternMetadata.builder(nodeId).setVarOrder(varOrder).setStatementPattern(pattern).setParentNodeId(parentNodeId);
}
+
/**
* Write an instance of {@link AggregationMetadata} to the Fluo table.
*
@@ -432,10 +500,11 @@ public class FluoQueryMetadataDAO {
requireNonNull(query);
// Write the rest of the metadata objects.
- switch(query.getQueryType()) {
+ switch (query.getQueryType()) {
case Construct:
ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
- // Store the Query ID so that it may be looked up from the original SPARQL string.
+ // Store the Query ID so that it may be looked up from the original
+ // SPARQL string.
final String constructSparql = constructMetadata.getSparql();
final String constructQueryId = constructMetadata.getNodeId();
tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId));
@@ -443,13 +512,19 @@ public class FluoQueryMetadataDAO {
break;
case Projection:
QueryMetadata queryMetadata = query.getQueryMetadata().get();
- // Store the Query ID so that it may be looked up from the original SPARQL string.
+ // Store the Query ID so that it may be looked up from the original
+ // SPARQL string.
final String sparql = queryMetadata.getSparql();
final String queryId = queryMetadata.getNodeId();
tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
write(tx, queryMetadata);
break;
}
+
+ Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata();
+ if(periodicMetadata.isPresent()) {
+ write(tx, periodicMetadata.get());
+ }
for(final FilterMetadata filter : query.getFilterMetadata()) {
write(tx, filter);
@@ -510,6 +585,15 @@ public class FluoQueryMetadataDAO {
addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId());
break;
+ case PERIODIC_QUERY:
+ // Add this node's metadata.
+ final PeriodicQueryMetadata.Builder periodicQueryBuilder = readPeriodicQueryMetadataBuilder(sx, childNodeId);
+ builder.addPeriodicQueryMetadata(periodicQueryBuilder);
+
+ // Add it's child's metadata.
+ addChildMetadata(sx, builder, periodicQueryBuilder.build().getChildNodeId());
+ break;
+
case AGGREGATION:
// Add this node's metadata.
final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
@@ -546,6 +630,7 @@ public class FluoQueryMetadataDAO {
break;
default:
break;
+
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
new file mode 100644
index 0000000..33253f2
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * Metadata that is required for periodic queries in the Rya Fluo Application.
+ * If a periodic query is registered with the Rya Fluo application, the BindingSets
+ * are placed into temporal bins according to whether they occur within the window of
+ * a period's ending time. This Metadata is used to create a Bin Id, which is equivalent
+ * to the period's ending time, to be inserted into each BindingSet that occurs within that
+ * bin. This is to allow the AggregationUpdater to aggregate the bins by grouping on the
+ * Bin Id.
+ *
+ */
+public class PeriodicQueryMetadata extends CommonNodeMetadata {
+
+ private String parentNodeId;
+ private String childNodeId;
+ private long windowSize;
+ private long period;
+ private TimeUnit unit;
+ private String temporalVariable;
+
+ /**
+ * Constructs an instance of PeriodicQueryMetadata
+ * @param nodeId - id of periodic query node
+ * @param varOrder - variable order indicating the order the BindingSet results are written in
+ * @param parentNodeId - id of parent node
+ * @param childNodeId - id of child node
+ * @param windowSize - size of window used for filtering
+ * @param period - period size that indicates frequency of notifications
+ * @param unit - TimeUnit corresponding to window and period
+ * @param temporalVariable - temporal variable that periodic conditions are applied to
+ */
+ public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, String parentNodeId, String childNodeId, long windowSize, long period,
+ TimeUnit unit, String temporalVariable) {
+ super(nodeId, varOrder);
+ this.parentNodeId = Preconditions.checkNotNull(parentNodeId);
+ this.childNodeId = Preconditions.checkNotNull(childNodeId);
+ this.temporalVariable = Preconditions.checkNotNull(temporalVariable);
+ this.unit = Preconditions.checkNotNull(unit);
+ Preconditions.checkArgument(period > 0);
+ Preconditions.checkArgument(windowSize >= period);
+
+ this.windowSize = windowSize;
+ this.period = period;
+ }
+
+ /**
+ * @return id of parent for navigating query
+ */
+ public String getParentNodeId() {
+ return parentNodeId;
+ }
+
+ /**
+ *
+ * @return id of child for navigating query
+ */
+ public String getChildNodeId() {
+ return childNodeId;
+ }
+
+ /**
+ *
+ * @return temporal variable used for filtering events
+ */
+ public String getTemporalVariable() {
+ return temporalVariable;
+ }
+
+ /**
+ * @return window duration in millis
+ */
+ public long getWindowSize() {
+ return windowSize;
+ }
+
+ /**
+ * @return period duration in millis
+ */
+ public long getPeriod() {
+ return period;
+ }
+
+ /**
+ * @return {@link TimeUnit} for window duration and period duration
+ */
+ public TimeUnit getUnit() {
+ return unit;
+ }
+
+
+ /**
+ * @return {@link Builder} for chaining method calls to construct an instance of PeriodicQueryMetadata.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, parentNodeId, temporalVariable, period, windowSize, unit);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (o == this) {
+ return true;
+ }
+
+ if (o instanceof PeriodicQueryMetadata) {
+ if (super.equals(o)) {
+ PeriodicQueryMetadata metadata = (PeriodicQueryMetadata) o;
+ return new EqualsBuilder().append(childNodeId, metadata.childNodeId).append(parentNodeId, metadata.parentNodeId)
+ .append(windowSize, metadata.windowSize).append(period, metadata.period)
+ .append(unit, metadata.unit).append(temporalVariable, metadata.temporalVariable).isEquals();
+ }
+ return false;
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("PeriodicQueryMetadata {\n")
+ .append(" Node ID: " + super.getNodeId() + "\n")
+ .append(" Variable Order: " + super.getVariableOrder() + "\n")
+ .append(" Parent Node ID: " + parentNodeId + "\n")
+ .append(" Child Node ID: " + childNodeId + "\n")
+ .append(" Period: " + period + "\n")
+ .append(" Window Size: " + windowSize + "\n")
+ .append(" Time Unit: " + unit + "\n")
+ .append(" Temporal Variable: " + temporalVariable + "\n")
+ .append("}")
+ .toString();
+ }
+
+
+ /**
+ * Builder for chaining method calls to construct an instance of PeriodicQueryMetadata.
+ */
+ public static class Builder {
+
+ private String nodeId;
+ private VariableOrder varOrder;
+ private String parentNodeId;
+ private String childNodeId;
+ private long windowSize;
+ private long period;
+ private TimeUnit unit;
+ public String temporalVariable;
+
+ public Builder setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
+
+ /**
+ *
+ * @return id of of this node
+ */
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Set the {@link VariableOrder}
+ * @param varOrder to indicate order that results will be written in
+ * @return Builder for chaining methods calls
+ */
+ public Builder setVarOrder(VariableOrder varOrder) {
+ this.varOrder = varOrder;
+ return this;
+ }
+
+ /**
+ * Returns {@link VariableOrder}
+ * @return VariableOrder that indicates order that results are written in
+ */
+ public VariableOrder getVarOrder() {
+ return varOrder;
+ }
+
+ /**
+ * Sets id of parent node
+ * @param parentNodeId
+ * @return Builder for chaining methods calls
+ */
+ public Builder setParentNodeId(String parentNodeId) {
+ this.parentNodeId = parentNodeId;
+ return this;
+ }
+
+ /**
+ * @return id of parent node
+ */
+ public String getParentNodeId() {
+ return parentNodeId;
+ }
+
+ /**
+ * Set id of child node
+ * @param childNodeId
+ * @return Builder for chaining methods calls
+ */
+ public Builder setChildNodeId(String childNodeId) {
+ this.childNodeId = childNodeId;
+ return this;
+ }
+
+ /**
+ * Sets window size for periodic query
+ * @param windowSize
+ * @return Builder for chaining methods calls
+ */
+ public Builder setWindowSize(long windowSize) {
+ this.windowSize = windowSize;
+ return this;
+ }
+
+ /**
+ * Sets period for periodic query
+ * @param period
+ * @return Builder for chaining methods calls
+ */
+ public Builder setPeriod(long period) {
+ this.period = period;
+ return this;
+ }
+
+ /**
+ * Sets time unit of window and period for periodic query
+ * @param unit
+ * @return Builder for chaining methods calls
+ */
+ public Builder setUnit(TimeUnit unit) {
+ this.unit = unit;
+ return this;
+ }
+
+ /**
+ * Indicate which variable in BindingSet results is the temporal variable that periodic
+ * Conditions should be applied to
+ * @param temporalVariable
+ * @return Builder for chaining methods calls
+ */
+ public Builder setTemporalVariable(String temporalVariable) {
+ this.temporalVariable = temporalVariable;
+ return this;
+ }
+
+ /**
+ * @return PeriodicQueryMetadata constructed from parameters passed to this Builder
+ */
+ public PeriodicQueryMetadata build() {
+ return new PeriodicQueryMetadata(nodeId, varOrder, parentNodeId, childNodeId, windowSize, period, unit, temporalVariable);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
new file mode 100644
index 0000000..f1ade59
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.openrdf.query.algebra.QueryModelVisitor;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * This is a {@link UnaryTupleOperator} that gets placed in the parsed query
+ * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL String that
+ * contains the Periodic {@link Function} {@link PeriodicQueryUtil#PeriodicQueryURI}.
+ * The PeiodicQueryNode is created from the arguments passed to the Periodic Function,
+ * which consist of a time unit, a temporal period, a temporal window of time, and the
+ * temporal variable in the query, which assumes a value indicated by the
+ * Time ontology: http://www.w3.org/2006/time. The purpose of the PeriodicQueryNode
+ * is to filter out all events that did not occur within the specified window of time
+ * of this instant and to generate notifications at a regular interval indicated by the period.
+ *
+ */
+public class PeriodicQueryNode extends UnaryTupleOperator {
+
+ private TimeUnit unit;
+ private long windowDuration;
+ private long periodDuration;
+ private String temporalVar;
+
+ /**
+ * Creates a PeriodicQueryNode from the specified values.
+ * @param window - specifies the window of time that event must occur within from this instant
+ * @param period - regular interval at which notifications are generated (must be leq window).
+ * @param unit - time unit of the period and window
+ * @param temporalVar - temporal variable in query used for filtering
+ * @param arg - child of PeriodicQueryNode in parsed query
+ */
+ public PeriodicQueryNode(long window, long period, TimeUnit unit, String temporalVar, TupleExpr arg) {
+ super(checkNotNull(arg));
+ checkArgument(0 < period && period <= window);
+ this.temporalVar = checkNotNull(temporalVar);
+ this.unit = checkNotNull(unit);
+ this.windowDuration = window;
+ this.periodDuration = period;
+ }
+
+ /**
+ * @return - temporal variable used to filter events
+ */
+ public String getTemporalVariable() {
+ return temporalVar;
+ }
+
+ /**
+ * @return window duration in millis
+ */
+ public long getWindowSize() {
+ return windowDuration;
+ }
+
+ /**
+ * @return period duration in millis
+ */
+ public long getPeriod() {
+ return periodDuration;
+ }
+
+ /**
+ * @return {@link TimeUnit} for window duration and period duration
+ */
+ public TimeUnit getUnit() {
+ return unit;
+ }
+
+ @Override
+ public <X extends Exception> void visit(QueryModelVisitor<X> visitor) throws X {
+ visitor.meetOther(this);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if(this == other) {
+ return true;
+ }
+
+ if (other instanceof PeriodicQueryNode) {
+ if (super.equals(other)) {
+ PeriodicQueryNode metadata = (PeriodicQueryNode) other;
+ return new EqualsBuilder().append(windowDuration, metadata.windowDuration).append(periodDuration, metadata.periodDuration)
+ .append(unit, metadata.unit).append(temporalVar, metadata.temporalVar).isEquals();
+ }
+ return false;
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(arg, unit, windowDuration, periodDuration, temporalVar);
+ }
+
+ /**
+ * @return String representation of this node that is printed in when query tree is printed.
+ */
+ @Override
+ public String getSignature() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("PeriodicQueryNode(");
+ sb.append("Var = " + temporalVar + ", ");
+ sb.append("Window = " + windowDuration + " ms, ");
+ sb.append("Period = " + periodDuration + " ms, ");
+ sb.append("Time Unit = " + unit + ")");
+
+
+ return sb.toString();
+ }
+
+ @Override
+ public PeriodicQueryNode clone() {
+ PeriodicQueryNode clone = (PeriodicQueryNode)super.clone();
+ clone.setArg(getArg().clone());
+ clone.periodDuration = periodDuration;
+ clone.windowDuration = windowDuration;
+ clone.unit = unit;
+ clone.temporalVar = temporalVar;
+ return clone;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index 23ac286..d017724 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -142,6 +142,10 @@ public class QueryMetadata extends CommonNodeMetadata {
public Builder(final String nodeId) {
this.nodeId = checkNotNull(nodeId);
}
+
+ public String getNodeId() {
+ return nodeId;
+ }
/**
@@ -154,6 +158,13 @@ public class QueryMetadata extends CommonNodeMetadata {
this.varOrder = varOrder;
return this;
}
+
+ /**
+ * @return the variable order of binding sets that are emitted by this node
+ */
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
/**
* Set the SPARQL query whose results are being updated by the Fluo app.
@@ -176,6 +187,10 @@ public class QueryMetadata extends CommonNodeMetadata {
this.childNodeId = childNodeId;
return this;
}
+
+ public String getChildNodeId() {
+ return childNodeId;
+ }
/**
* @return An instance of {@link QueryMetadata} build using this builder's values.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 631ce60..8e348f2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -18,12 +18,13 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.query;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
@@ -40,12 +41,14 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
-import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
+import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.openrdf.model.Value;
import org.openrdf.model.impl.BNodeImpl;
@@ -105,7 +108,9 @@ public class SparqlFluoQueryBuilder {
final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
final NewQueryVisitor visitor = new NewQueryVisitor(sparql, fluoQueryBuilder, nodeIds);
- parsedQuery.getTupleExpr().visit( visitor );
+ TupleExpr te = parsedQuery.getTupleExpr();
+ PeriodicQueryUtil.placePeriodicQueryNode(te);
+ te.visit( visitor );
final FluoQuery fluoQuery = fluoQueryBuilder.build();
return fluoQuery;
@@ -187,16 +192,17 @@ public class SparqlFluoQueryBuilder {
prefix = AGGREGATION_PREFIX;
} else if (node instanceof Reduced) {
prefix = CONSTRUCT_PREFIX;
+ } else if(node instanceof PeriodicQueryNode) {
+ prefix = PERIODIC_QUERY_PREFIX;
} else {
throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
}
- // Create the unique portion of the id.
final String unique = UUID.randomUUID().toString().replaceAll("-", "");
-
// Put them together to create the Node ID.
return prefix + "_" + unique;
}
+
}
/**
@@ -204,19 +210,13 @@ public class SparqlFluoQueryBuilder {
* the node to a {@link FluoQuery.Builder}. This information is used by the
* application's observers to incrementally update a PCJ.
*/
- private static class NewQueryVisitor extends QueryModelVisitorBase<RuntimeException> {
+ public static class NewQueryVisitor extends QueryModelVisitorBase<RuntimeException> {
private final NodeIds nodeIds;
private final FluoQuery.Builder fluoQueryBuilder;
private final String sparql;
/**
- * Stored with each Filter node so that we can figure out how to evaluate it within
- * {@link FilterResultUpdater}. Incremented each time a filter has been stored.
- */
- private int filterIndexWithinQuery = 0;
-
- /**
* Constructs an instance of {@link NewQueryVisitor}.
*
* @param sparql - The SPARQL query whose structure will be represented
@@ -378,6 +378,7 @@ public class SparqlFluoQueryBuilder {
@Override
public void meet(final Filter node) {
+
// Get or create a builder for this node populated with the known metadata.
final String filterId = nodeIds.getOrMakeId(node);
@@ -387,8 +388,13 @@ public class SparqlFluoQueryBuilder {
fluoQueryBuilder.addFilterMetadata(filterBuilder);
}
- filterBuilder.setOriginalSparql(sparql);
- filterBuilder.setFilterIndexWithinSparql(filterIndexWithinQuery++);
+ String filterString;
+ try {
+ filterString = FilterSerializer.serialize(node);
+ } catch (FilterParseException e) {
+ throw new RuntimeException(e);
+ }
+ filterBuilder.setFilterSparql(filterString);
final QueryModelNode child = node.getArg();
if(child == null) {
@@ -406,6 +412,47 @@ public class SparqlFluoQueryBuilder {
// Walk to the next node.
super.meet(node);
}
+
+ public void meetOther(final QueryModelNode qNode) {
+ if (qNode instanceof PeriodicQueryNode) {
+ PeriodicQueryNode node = (PeriodicQueryNode) qNode;
+ // Get or create a builder for this node populated with the
+ // known metadata.
+ final String periodicId = nodeIds.getOrMakeId(node);
+
+ PeriodicQueryMetadata.Builder periodicBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
+ if (periodicBuilder == null) {
+ periodicBuilder = PeriodicQueryMetadata.builder();
+ periodicBuilder.setNodeId(periodicId);
+ fluoQueryBuilder.addPeriodicQueryMetadata(periodicBuilder);
+ }
+ periodicBuilder.setWindowSize(node.getWindowSize());
+ periodicBuilder.setPeriod(node.getPeriod());
+ periodicBuilder.setTemporalVariable(node.getTemporalVariable());
+ periodicBuilder.setUnit(node.getUnit());
+
+ final QueryModelNode child = node.getArg();
+ if (child == null) {
+ throw new IllegalArgumentException("PeriodicQueryNode child arg connot be null.");
+ }
+
+ final String childNodeId = nodeIds.getOrMakeId(child);
+ periodicBuilder.setChildNodeId(childNodeId);
+
+ // Update the child node's metadata.
+ final Set<String> childVars = getVars((TupleExpr) child);
+ final VariableOrder childVarOrder = new VariableOrder(childVars);
+ setChildMetadata(childNodeId, childVarOrder, periodicId);
+
+ // update variable order of this node and all ancestors to
+ // include BIN_ID binding as
+ // first variable in the ordering
+ PeriodicQueryUtil.updateVarOrdersToIncludeBin(fluoQueryBuilder, periodicId);
+ // Walk to the next node.
+ node.getArg().visit(this);
+ }
+ }
+
@Override
public void meet(final Projection node) {
@@ -553,10 +600,24 @@ public class SparqlFluoQueryBuilder {
case QUERY:
throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");
+
case CONSTRUCT:
throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node.");
+
+ case PERIODIC_QUERY:
+ PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
+ if (periodicQueryBuilder == null) {
+ periodicQueryBuilder = PeriodicQueryMetadata.builder();
+ periodicQueryBuilder.setNodeId(childNodeId);
+ fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
+ }
+ periodicQueryBuilder.setVarOrder(childVarOrder);
+ periodicQueryBuilder.setParentNodeId(parentNodeId);
+ break;
+
default:
throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
new file mode 100644
index 0000000..73f3447
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.SingletonSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+/**
+ * Class for creating a String representation a given Filter, and for
+ * converting the String representation of the Filter back to the Filter.
+ *
+ */
+public class FilterSerializer {
+
+ private static final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer();
+ private static final SPARQLParser parser = new SPARQLParser();
+
+ /**
+ * Converts a {@link Filter} to a SPARQL query containing only the SPARQL representation
+ * of the Filter along with a Select clause that return all variables. The argument of the
+ * Filter is replaced by a {@link SingletonSet} so that the body of the SPARQL query consists of only a
+ * single Filter clause.
+ * @param filter - Filter to be serialized
+ * @return - SPARQL String containing a single Filter clause that represents the serialized Filter
+ * @throws FilterParseException
+ */
+ public static String serialize(Filter filter) throws FilterParseException {
+ Filter clone = filter.clone();
+ clone.setArg(new SingletonSet());
+ try {
+ return renderer.render(new ParsedTupleQuery(clone));
+ } catch (Exception e) {
+ throw new FilterParseException("Unable to parse Filter.", e);
+ }
+ }
+
+ /**
+ * Converts a SPARQL query consisting of a single Filter clause back to a Filter.
+ * @param sparql - SPARQL query representing a Filter
+ * @return - parsed Filter included in the SPARQL query
+ * @throws FilterParseException
+ */
+ public static Filter deserialize(String sparql) throws FilterParseException {
+
+ try {
+ ParsedQuery pq = parser.parseQuery(sparql, null);
+ FilterVisitor visitor = new FilterVisitor();
+ pq.getTupleExpr().visit(visitor);
+ Set<Filter> filters = visitor.getFilters();
+
+ if(filters.size() != 1) {
+ throw new FilterParseException("Filter String must contain only one Filter.");
+ }
+
+ return filters.iterator().next();
+
+ } catch (Exception e) {
+ throw new FilterParseException("Unable to parse Filter.", e);
+ }
+ }
+
+ public static class FilterVisitor extends QueryModelVisitorBase<RuntimeException> {
+
+ private Set<Filter> filters;
+
+ public FilterVisitor() {
+ filters = new HashSet<>();
+ }
+
+ public Set<Filter> getFilters() {
+ return filters;
+ }
+
+ public void meet(Filter node) {
+ filters.add(node);
+ }
+ }
+
+ public static class FilterParseException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link FilterParseException}.
+ *
+ * @param message - Explains why this exception is being thrown.
+ */
+ public FilterParseException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an instance of {@link FilterParseException}.
+ *
+ * @param message - Explains why this exception is being thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public FilterParseException(final String message, final Throwable t) {
+ super(message, t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
new file mode 100644
index 0000000..9446c87
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.Optional;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+
+/**
+ * Factory for creating {@link FluoClient}s.
+ *
+ */
+public class FluoClientFactory {
+
+ /**
+ * Creates a FluoClient
+ * @param appName - name of Fluo application
+ * @param tableName - name of Fluo table
+ * @param conf - AccumuloConfiguration (must contain Accumulo User, Accumulo Instance, Accumulo Password, and Accumulo Zookeepers)
+ * @return FluoClient for connecting to Fluo
+ */
+ public static FluoClient getFluoClient(String appName, Optional<String> tableName, AccumuloRdfConfiguration conf) {
+ FluoConfiguration fluoConfig = new FluoConfiguration();
+ fluoConfig.setAccumuloInstance(conf.getAccumuloInstance());
+ fluoConfig.setAccumuloUser(conf.getAccumuloUser());
+ fluoConfig.setAccumuloPassword(conf.getAccumuloPassword());
+ fluoConfig.setInstanceZookeepers(conf.getAccumuloZookeepers() + "/fluo");
+ fluoConfig.setAccumuloZookeepers(conf.getAccumuloZookeepers());
+ fluoConfig.setApplicationName(appName);
+ if (tableName.isPresent()) {
+ fluoConfig.setAccumuloTable(tableName.get());
+ } else {
+ fluoConfig.setAccumuloTable(appName);
+ }
+ return new FluoClientImpl(fluoConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
new file mode 100644
index 0000000..fd24af2
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Literal;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.Group;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class for creating and executing Perioid Queries.
+ *
+ */
+public class PeriodicQueryUtil {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+ public static final String PeriodicQueryURI = "http://org.apache.rya/function#periodic";
+ public static final String temporalNameSpace = "http://www.w3.org/2006/time#";
+ public static final URI DAYS = vf.createURI("http://www.w3.org/2006/time#days");
+ public static final URI HOURS = vf.createURI("http://www.w3.org/2006/time#hours");
+ public static final URI MINUTES = vf.createURI("http://www.w3.org/2006/time#minutes");
+
+ /**
+ * Returns a PeriodicQueryNode for all {@link FunctionCall}s that represent PeriodicQueryNodes, otherwise
+ * an empty Optional is returned.
+ * @param functionCall - FunctionCall taken from a {@lin TupleExpr}
+ * @param arg - TupleExpr that will be the argument of the PeriodicQueryNode if it is created
+ * @return - Optional containing a PeriodicQueryNode if FunctionCall represents PeriodicQueryNode and empty Optional otherwise
+ * @throws Exception
+ */
+ public static Optional<PeriodicQueryNode> getPeriodicQueryNode(FunctionCall functionCall, TupleExpr arg) throws Exception {
+
+ if (functionCall.getURI().equals(PeriodicQueryURI)) {
+ return Optional.of(parseAndSetValues(functionCall.getArgs(), arg));
+ }
+
+ return Optional.empty();
+ }
+
+ /**
+ * Finds and places a PeriodicQueryNode if the TupleExpr contains a FunctionCall
+ * that represents a PeriodicQueryNode.
+ * @param query - TupleExpr with PeriodicQueryNode placed and positioned at the top of the query
+ */
+ public static void placePeriodicQueryNode(TupleExpr query) {
+ query.visit(new PeriodicQueryNodeVisitor());
+ query.visit(new PeriodicQueryNodeRelocator());
+ }
+
+ public static Optional<PeriodicQueryNode> getPeriodicNode(String sparql) throws MalformedQueryException {
+ TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+ PeriodicQueryNodeVisitor periodicVisitor = new PeriodicQueryNodeVisitor();
+ te.visit(periodicVisitor);
+ return periodicVisitor.getPeriodicNode();
+ }
+
+ /**
+ * Locates Filter containing FunctionCall with PeriodicQuery info and
+ * replaces that Filter with a PeriodicQueryNode.
+ */
+ public static class PeriodicQueryNodeVisitor extends QueryModelVisitorBase<RuntimeException> {
+
+ private int count = 0;
+ private PeriodicQueryNode periodicNode;
+
+ public Optional<PeriodicQueryNode> getPeriodicNode() {
+ return Optional.ofNullable(periodicNode);
+ }
+
+ public void meet(Filter node) {
+ if (node.getCondition() instanceof FunctionCall) {
+ try {
+ Optional<PeriodicQueryNode> optNode = getPeriodicQueryNode((FunctionCall) node.getCondition(), node.getArg());
+ if (optNode.isPresent()) {
+ if (count > 0) {
+ throw new IllegalArgumentException("Query cannot contain more than one PeriodicQueryNode");
+ }
+ periodicNode = optNode.get();
+ node.replaceWith(periodicNode);
+ count++;
+ periodicNode.visit(this);
+ } else {
+ super.meet(node);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ } else {
+ super.meet(node);
+ }
+ }
+ }
+
+ /**
+ * Relocates PeriodicQueryNode so that it occurs below either the Construct
+ * Query Node, the Projection Query Node if no Aggregation exists, or the
+ * Group Node if an Aggregation exists. This limits the number of nodes
+ * whose variable order needs to be changed when the PeriodicQueryMetadata
+ * is added.
+ */
+ public static class PeriodicQueryNodeRelocator extends QueryModelVisitorBase<RuntimeException> {
+
+ private UnaryTupleOperator relocationParent;
+
+ public void meet(Projection node) {
+ relocationParent = node;
+ node.getArg().visit(this);
+ }
+
+ public void meet(Group node) {
+ relocationParent = node;
+ super.meet(node);
+ }
+
+ public void meet(Reduced node) {
+ relocationParent = node;
+ super.meet(node);
+ }
+
+ public void meet(Filter node) {
+ super.meet(node);
+ }
+
+ @Override
+ public void meetOther(QueryModelNode node) {
+
+ if (node instanceof PeriodicQueryNode) {
+ PeriodicQueryNode pNode = (PeriodicQueryNode) node;
+ // do nothing if PeriodicQueryNode already positioned correctly
+ if (pNode.equals(relocationParent.getArg())) {
+ return;
+ }
+ // remove node from query
+ pNode.replaceWith(pNode.getArg());
+ // set node' child to be relocationParent's child
+ pNode.setArg(relocationParent.getArg());
+ // add node back into query below relocationParent
+ relocationParent.replaceChildNode(relocationParent.getArg(), pNode);
+ }
+ }
+ }
+
+ /**
+ * Adds the variable "periodicBinId" to the beginning of all {@link VariableOrder}s for the
+ * Metadata nodes that appear above the PeriodicQueryNode. This ensures that the binId is
+ * written first in the Row so that bins can be easily scanned and deleted.
+ * @param builder
+ * @param nodeId
+ */
+ public static void updateVarOrdersToIncludeBin(FluoQuery.Builder builder, String nodeId) {
+ NodeType type = NodeType.fromNodeId(nodeId).orNull();
+ if (type == null) {
+ throw new IllegalArgumentException("NodeId must be associated with an existing MetadataBuilder.");
+ }
+ switch (type) {
+ case AGGREGATION:
+ AggregationMetadata.Builder aggBuilder = builder.getAggregateBuilder(nodeId).orNull();
+ if (aggBuilder != null) {
+ VariableOrder varOrder = aggBuilder.getVariableOrder();
+ VariableOrder groupOrder = aggBuilder.getGroupByVariableOrder();
+ // update varOrder with BIN_ID
+ List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
+ orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+ aggBuilder.setVariableOrder(new VariableOrder(orderList));
+ // update groupVarOrder with BIN_ID
+ List<String> groupOrderList = new ArrayList<>(groupOrder.getVariableOrders());
+ groupOrderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+ aggBuilder.setGroupByVariableOrder(new VariableOrder(groupOrderList));
+ // recursive call to update the VariableOrders of all ancestors
+ // of this node
+ updateVarOrdersToIncludeBin(builder, aggBuilder.getParentNodeId());
+ } else {
+ throw new IllegalArgumentException("There is no AggregationMetadata.Builder for the indicated Id.");
+ }
+ break;
+ case PERIODIC_QUERY:
+ PeriodicQueryMetadata.Builder periodicBuilder = builder.getPeriodicQueryBuilder().orNull();
+ if (periodicBuilder != null && periodicBuilder.getNodeId().equals(nodeId)) {
+ VariableOrder varOrder = periodicBuilder.getVarOrder();
+ List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
+ orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+ periodicBuilder.setVarOrder(new VariableOrder(orderList));
+ // recursive call to update the VariableOrders of all ancestors
+ // of this node
+ updateVarOrdersToIncludeBin(builder, periodicBuilder.getParentNodeId());
+ } else {
+ throw new IllegalArgumentException(
+ "PeriodicQueryMetadata.Builder id does not match the indicated id. A query cannot have more than one PeriodicQueryMetadata Node.");
+ }
+ break;
+ case QUERY:
+ QueryMetadata.Builder queryBuilder = builder.getQueryBuilder().orNull();
+ if (queryBuilder != null && queryBuilder.getNodeId().equals(nodeId)) {
+ VariableOrder varOrder = queryBuilder.getVariableOrder();
+ List<String> orderList = new ArrayList<>(varOrder.getVariableOrders());
+ orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID);
+ queryBuilder.setVariableOrder(new VariableOrder(orderList));
+ } else {
+ throw new IllegalArgumentException(
+ "QueryMetadata.Builder id does not match the indicated id. A query cannot have more than one QueryMetadata Node.");
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Incorrectly positioned PeriodicQueryNode. The PeriodicQueryNode can only be positioned below Projections, Extensions, and ConstructQueryNodes.");
+ }
+ }
+
+ /**
+ * Collects all Metadata node Ids that are ancestors of the PeriodicQueryNode and contain the variable
+ * {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}.
+ * @param sx - Fluo Snapshot for scanning Fluo
+ * @param nodeId - root node of the PeriodicQuery
+ * @param ids - query ids of all metadata nodes appearing between root and PeriodicQueryMetadata node
+ */
+ public static void getPeriodicQueryNodeAncestorIds(SnapshotBase sx, String nodeId, Set<String> ids) {
+ NodeType nodeType = NodeType.fromNodeId(nodeId).orNull();
+ checkArgument(nodeType != null, "Invalid nodeId: " + nodeId + ". NodeId does not correspond to a valid NodeType.");
+ switch (nodeType) {
+ case FILTER:
+ ids.add(nodeId);
+ getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.FILTER_CHILD_NODE_ID).toString(), ids);
+ break;
+ case PERIODIC_QUERY:
+ ids.add(nodeId);
+ break;
+ case QUERY:
+ ids.add(nodeId);
+ getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.QUERY_CHILD_NODE_ID).toString(), ids);
+ break;
+ case AGGREGATION:
+ ids.add(nodeId);
+ getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_CHILD_NODE_ID).toString(), ids);
+ break;
+ default:
+ throw new RuntimeException("Invalid NodeType.");
+ }
+ }
+
+
+
+ /**
+ *
+ * @param values - Values extracted from FunctionCall representing the PeriodicQuery Filter
+ * @param arg - Argument of the PeriodicQueryNode that will be created (PeriodicQueryNode is a UnaryTupleOperator)
+ * @return - PeriodicQueryNode to be inserted in place of the original FunctionCall
+ * @throws Exception
+ */
+ private static PeriodicQueryNode parseAndSetValues(List<ValueExpr> values, TupleExpr arg) throws Exception {
+ // general validation of input
+ Preconditions.checkArgument(values.size() == 4);
+ Preconditions.checkArgument(values.get(0) instanceof Var);
+ Preconditions.checkArgument(values.get(1) instanceof ValueConstant);
+ Preconditions.checkArgument(values.get(2) instanceof ValueConstant);
+ Preconditions.checkArgument(values.get(3) instanceof ValueConstant);
+
+ // get temporal variable
+ Var var = (Var) values.get(0);
+ Preconditions.checkArgument(var.getValue() == null);
+ String tempVar = var.getName();
+
+ // get TimeUnit
+ TimeUnit unit = getTimeUnit((ValueConstant) values.get(3));
+
+ // get window and period durations
+ double windowDuration = parseTemporalDuration((ValueConstant) values.get(1));
+ double periodDuration = parseTemporalDuration((ValueConstant) values.get(2));
+ long windowMillis = convertToMillis(windowDuration, unit);
+ long periodMillis = convertToMillis(periodDuration, unit);
+ // period must evenly divide window at least once
+ Preconditions.checkArgument(windowMillis > periodMillis);
+ Preconditions.checkArgument(windowMillis % periodMillis == 0, "Period duration does not evenly divide window duration.");
+
+ // create PeriodicMetadata.Builder
+ return new PeriodicQueryNode(windowMillis, periodMillis, TimeUnit.MILLISECONDS, tempVar, arg);
+ }
+
+ private static TimeUnit getTimeUnit(ValueConstant val) {
+ Preconditions.checkArgument(val.getValue() instanceof URI);
+ URI uri = (URI) val.getValue();
+ Preconditions.checkArgument(uri.getNamespace().equals(temporalNameSpace));
+
+ switch (uri.getLocalName()) {
+ case "days":
+ return TimeUnit.DAYS;
+ case "hours":
+ return TimeUnit.HOURS;
+ case "minutes":
+ return TimeUnit.MINUTES;
+ default:
+ throw new IllegalArgumentException("Invalid time unit for Periodic Function.");
+ }
+ }
+
+ private static double parseTemporalDuration(ValueConstant valConst) {
+ Value val = valConst.getValue();
+ Preconditions.checkArgument(val instanceof Literal);
+ Literal literal = (Literal) val;
+ String stringVal = literal.getLabel();
+ URI dataType = literal.getDatatype();
+ Preconditions.checkArgument(dataType.equals(XMLSchema.DECIMAL) || dataType.equals(XMLSchema.DOUBLE)
+ || dataType.equals(XMLSchema.FLOAT) || dataType.equals(XMLSchema.INTEGER) || dataType.equals(XMLSchema.INT));
+ return Double.parseDouble(stringVal);
+ }
+
+ private static long convertToMillis(double duration, TimeUnit unit) {
+ Preconditions.checkArgument(duration > 0);
+
+ double convertedDuration = 0;
+ switch (unit) {
+ case DAYS:
+ convertedDuration = duration * 24 * 60 * 60 * 1000;
+ break;
+ case HOURS:
+ convertedDuration = duration * 60 * 60 * 1000;
+ break;
+ case MINUTES:
+ convertedDuration = duration * 60 * 1000;
+ break;
+ default:
+ throw new IllegalArgumentException("TimeUnit must be of type DAYS, HOURS, or MINUTES.");
+ }
+ // check that double representation has exact millis representation
+ Preconditions.checkArgument(convertedDuration == (long) convertedDuration);
+ return (long) convertedDuration;
+ }
+
+}