You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/12/05 19:49:24 UTC
[3/4] incubator-rya git commit: RYA-406. Closes #251.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
index 749a77d..d56574e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
@@ -30,18 +30,18 @@ import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
import com.google.common.base.Preconditions;
/**
- * This class processes {@link SpanBatchDeleteInformation} objects by
- * deleting the entries in the Fluo Column corresponding to the {@link Span}
- * of the BatchInformation object. This class will delete entries until the
- * batch size is met, and then create a new SpanBatchDeleteInformation object
- * with an updated Span whose starting point is the stopping point of this
- * batch. If the batch limit is not met, then a new batch is not created and
- * the task is complete.
+ * This class processes {@link SpanBatchDeleteInformation} objects by deleting the entries in the Fluo Column
+ * corresponding to the {@link Span} of the BatchInformation object. This class will delete entries until the batch size
+ * is met, and then create a new SpanBatchDeleteInformation object with an updated Span whose starting point is the
+ * stopping point of this batch. If the batch limit is not met, then a new batch is not created and the task is
+ * complete.
*
*/
public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
@@ -49,8 +49,8 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class);
/**
- * Process SpanBatchDeleteInformation objects by deleting all entries indicated
- * by Span until batch limit is met.
+ * Process SpanBatchDeleteInformation objects by deleting all entries indicated by Span until batch limit is met.
+ *
* @param tx - Fluo Transaction
* @param row - Byte row identifying BatchInformation
* @param batch - SpanBatchDeleteInformation object to be processed
@@ -60,6 +60,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
super.processBatch(tx, row, batch);
Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation);
SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch;
+ Optional<String> nodeId = spanBatch.getNodeId();
Task task = spanBatch.getTask();
int batchSize = spanBatch.getBatchSize();
Span span = spanBatch.getSpan();
@@ -71,7 +72,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed.");
break;
case Delete:
- rowCol = deleteBatch(tx, span, column, batchSize);
+ rowCol = deleteBatch(tx, nodeId, span, column, batchSize);
break;
case Update:
log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed.");
@@ -90,7 +91,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
}
}
- private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) {
+ private Optional<RowColumn> deleteBatch(TransactionBase tx, Optional<String> nodeId, Span span, Column column, int batchSize) {
log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column);
RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
@@ -100,18 +101,39 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
int count = 0;
boolean batchLimitMet = false;
Bytes row = span.getStart().getRow();
+
+ //get prefix if nodeId is specified
+ Optional<Bytes> prefixBytes = Optional.empty();
+ if (nodeId.isPresent()) {
+ NodeType type = NodeType.fromNodeId(nodeId.get()).get();
+ prefixBytes = Optional.ofNullable(Bytes.of(type.getNodeTypePrefix()));
+ }
+
while (colScannerIter.hasNext() && !batchLimitMet) {
ColumnScanner colScanner = colScannerIter.next();
row = colScanner.getRow();
- Iterator<ColumnValue> iter = colScanner.iterator();
- while (iter.hasNext()) {
- if (count >= batchSize) {
- batchLimitMet = true;
- break;
+
+ //extract the nodeId from the returned row if a nodeId was passed
+ //into the SpanBatchInformation. This is to ensure that the returned
+ //row nodeId is equal to the nodeId passed in to the span batch information
+ Optional<String> rowNodeId = Optional.empty();
+ if (prefixBytes.isPresent()) {
+ rowNodeId = Optional.of(BindingSetRow.makeFromShardedRow(prefixBytes.get(), row).getNodeId());
+ }
+
+ //if nodeId is present, then results returned by span are filtered
+ //on the nodeId. This occurs when the hash is not included in the span
+ if (!rowNodeId.isPresent() || rowNodeId.equals(nodeId)) {
+ Iterator<ColumnValue> iter = colScanner.iterator();
+ while (iter.hasNext()) {
+ if (count >= batchSize) {
+ batchLimitMet = true;
+ break;
+ }
+ ColumnValue colVal = iter.next();
+ tx.delete(row, colVal.getColumn());
+ count++;
}
- ColumnValue colVal = iter.next();
- tx.delete(row, colVal.getColumn());
- count++;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
index 3b1e245..87158b7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
@@ -1,4 +1,8 @@
package org.apache.rya.indexing.pcj.fluo.app.batch;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Optional;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,22 +23,38 @@ package org.apache.rya.indexing.pcj.fluo.app.batch;
*/
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
/**
* This class represents a batch order to delete all entries in the Fluo table indicated
* by the given Span and Column. These batch orders are processed by the {@link BatchObserver},
* which uses this batch information along with the nodeId passed into the Observer to perform
- * batch deletes.
+ * batch deletes.
*
*/
public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater();
-
- public SpanBatchDeleteInformation(int batchSize, Column column, Span span) {
+ private Optional<String> nodeId;
+
+ /**
+ * Create a new SpanBatchInformation object.
+ * @param nodeId - Optional nodeId that is used to filter returned results. Useful if the shard Id
+ * is not included in the Span (see {@link BindingHashShardingFunction} for more info about how sharded
+ * row keys are generated).
+ * @param batchSize - size of batch to be deleted
+ * @param column - column whose entries will be deleted
+ * @param span - Span indicating the range of data to delete. Sometimes the Span cannot contain the hash
+ * (for example, if you are deleting all of the results associated with a nodeId). In this case, a nodeId
+ * should be specified along with a Span equal to the prefix of the nodeId.
+ * @throws IllegalArgumentException if nodeId, column or span is null and if batchSize <= 0.
+ */
+ public SpanBatchDeleteInformation(Optional<String> nodeId, int batchSize, Column column, Span span) {
super(batchSize, Task.Delete, column, span);
+ checkNotNull(nodeId);
+ this.nodeId = nodeId;
}
-
+
/**
* @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column}
*/
@@ -42,17 +62,42 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
public BatchBindingSetUpdater getBatchUpdater() {
return updater;
}
-
-
+
+ /**
+ * Returns an Optional nodeId. If this value is specified, the results
+ * returned from the Fluo scan over the indicated range will be filtered
+ * by the nodeId. The nodeId allows results for a given query nodeId to be
+ * deleted using a Span even if the hash cannot be specified when forming the
+ * rowId in the table.
+ * @return - the nodeId whose results will be batch deleted
+ */
+ public Optional<String> getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("Span Batch Information {\n")
+ .append(" Span: " + super.getSpan() + "\n")
+ .append(" Batch Size: " + super.getBatchSize() + "\n")
+ .append(" Task: " + super.getTask() + "\n")
+ .append(" Column: " + super.getColumn() + "\n")
+ .append(" NodeId: " + nodeId + "\n")
+ .append("}")
+ .toString();
+ }
+
public static Builder builder() {
return new Builder();
}
-
+
public static class Builder {
private int batchSize = DEFAULT_BATCH_SIZE;
private Column column;
private Span span;
+ private Optional<String> nodeId = Optional.empty();
/**
* @param batchSize - {@link Task}s are applied in batches of this size
@@ -74,19 +119,34 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
/**
* @param span - span that batch {@link Task} will be applied to
- *
+ *
*/
public Builder setSpan(Span span) {
this.span = span;
return this;
}
+ /**
+ * Sets the nodeId whose results will be batch deleted. This optional value
+ * allows the {@link SpanBatchBindingSetUpdater} to filter on the indicated
+ * nodeId. Because the results of the Fluo table are sharded, if the Span does
+ * not include the shard, then it is not possible to scan exactly for all results
+ * pertaining to a specific nodeId. In the event that a user wants to delete all nodes
+ * related to a specific entry, this Optional nodeId should be specified to retrieve
+ * only the results associated with the indicated nodeId.
+ * @param nodeId - node whose results will be batch deleted
+ * @return - Builder for chaining method calls
+ */
+ public Builder setNodeId(Optional<String> nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
/**
* @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder
*/
public SpanBatchDeleteInformation build() {
- return new SpanBatchDeleteInformation(batchSize, column, span);
+ return new SpanBatchDeleteInformation(nodeId, batchSize, column, span);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
index 98deb8e..8644c31 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
@@ -1,4 +1,5 @@
package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
* under the License.
*/
import java.lang.reflect.Type;
+import java.util.Optional;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
@@ -37,10 +39,12 @@ import com.google.gson.JsonSerializer;
* JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects.
*
*/
-public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
+public class SpanBatchInformationTypeAdapter
+ implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
@Override
- public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context)
+ throws JsonParseException {
JsonObject json = element.getAsJsonObject();
int batchSize = json.get("batchSize").getAsInt();
String[] colArray = json.get("column").getAsString().split("\u0000");
@@ -49,7 +53,12 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch
boolean startInc = json.get("startInc").getAsBoolean();
boolean endInc = json.get("endInc").getAsBoolean();
Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
- return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build();
+ String nodeId = json.get("nodeId").getAsString();
+ Optional<String> id = Optional.empty();
+ if (!nodeId.isEmpty()) {
+ id = Optional.of(nodeId);
+ }
+ return SpanBatchDeleteInformation.builder().setNodeId(id).setBatchSize(batchSize).setSpan(span).setColumn(column).build();
}
@Override
@@ -63,6 +72,8 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch
result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+ String nodeId = batch.getNodeId().orElse("");
+ result.add("nodeId", new JsonPrimitive(nodeId));
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
index e33ea97..20a6b97 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.export.rya;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes;
import java.util.Collection;
import java.util.Map;
@@ -54,11 +55,11 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
private static final Logger log = Logger.getLogger(RyaSubGraphExporter.class);
private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
private final FluoClient fluo;
-
+
public RyaSubGraphExporter(FluoClient fluo) {
this.fluo = Preconditions.checkNotNull(fluo);
}
-
+
@Override
public Set<QueryType> getQueryTypes() {
return Sets.newHashSet(QueryType.CONSTRUCT);
@@ -78,12 +79,12 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
insertTriples(fluo.newTransaction(), subgraph.getStatements());
}
-
+
private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
for (final RyaStatement triple : triples) {
Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
try {
- tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+ tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
} catch (final TripleRowResolverException e) {
log.error("Could not convert a Triple into the SPO format: " + triple);
}
@@ -97,10 +98,10 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
* @return The Rya SPO representation of the triple.
* @throws TripleRowResolverException The triple could not be converted.
*/
- private static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+ private static Bytes spoFormat(final RyaStatement triple) throws TripleRowResolverException {
checkNotNull(triple);
final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
- return spoRow.getRow();
+ return addTriplePrefixAndConvertToBytes(spoRow.getRow());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 6147fa8..a8c4d58 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -54,8 +55,8 @@ public class AggregationObserver extends BindingSetUpdater {
requireNonNull(tx);
requireNonNull(row);
- // Fetch the Aggregation node's metadata.
- final String nodeId = BindingSetRow.make(row).getNodeId();
+ // Make nodeId and fetch the Aggregation node's metadata.
+ final String nodeId = BindingSetRow.makeFromShardedRow(Bytes.of(AGGREGATION_PREFIX), row).getNodeId();
final AggregationMetadata metadata = queryDao.readAggregationMetadata(tx, nodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index 09d9ede..01c9d73 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -18,6 +18,8 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.observers;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
+
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -25,12 +27,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
/**
* Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
@@ -42,7 +44,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
public class ConstructQueryResultObserver extends AbstractObserver {
private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
- protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
+ private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
@Override
public ObservedColumn getObservedColumn() {
@@ -53,18 +55,18 @@ public class ConstructQueryResultObserver extends AbstractObserver {
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
//Build row for parent that result will be written to
- BindingSetRow bsRow = BindingSetRow.make(row);
+ BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(Bytes.of(CONSTRUCT_PREFIX), row);
String constructNodeId = bsRow.getNodeId();
String bsString= bsRow.getBindingSetString();
String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
- String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
+ Bytes rowBytes = BindingHashShardingFunction.getShardedScanPrefix(parentNodeId, bsString);
//Get NodeType of the parent node
NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
//Get data for the ConstructQuery result
Bytes bytes = tx.get(row, col);
//Write result to parent
- tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
+ tx.set(rowBytes, parentType.getResultColumn(), bytes);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index b4edfea..844343c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -49,7 +50,7 @@ public class FilterObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Filter metadata.
- final String filterNodeId = BindingSetRow.make(row).getNodeId();
+ final String filterNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(FILTER_PREFIX), row).getNodeId();
final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index c56a98f..f3f409e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -49,7 +50,7 @@ public class JoinObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Join metadata.
- final String joinNodeId = BindingSetRow.make(row).getNodeId();
+ final String joinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(JOIN_PREFIX), row).getNodeId();
final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
index 7d96baa..87d0ca2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -53,7 +54,7 @@ public class PeriodicQueryObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Join metadata.
- final String periodicBinNodeId = BindingSetRow.make(row).getNodeId();
+ final String periodicBinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PERIODIC_QUERY_PREFIX), row).getNodeId();
final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId);
// Read the Visibility Binding Set from the Value.
@@ -65,6 +66,6 @@ public class PeriodicQueryObserver extends BindingSetUpdater {
return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
index 5d73b2e..b77bf91 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -48,7 +49,7 @@ public class ProjectionObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Filter metadata.
- final String projectionNodeId = BindingSetRow.make(row).getNodeId();
+ final String projectionNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PROJECTION_PREFIX), row).getNodeId();
final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 78d0ec5..7fa4d38 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -18,13 +18,14 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.observers;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
@@ -50,7 +51,7 @@ import com.google.common.collect.ImmutableSet;
public class QueryResultObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class);
- protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
+ private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
/**
* Builders for each type of {@link IncrementalBindingSetExporter} we support.
*/
@@ -97,10 +98,9 @@ public class QueryResultObserver extends AbstractObserver {
@Override
public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception {
- final String row = brow.toString();
// Read the queryId from the row and get the QueryMetadata.
- final String queryId = row.split(NODEID_BS_DELIM)[0];
+ final String queryId = BindingSetRow.makeFromShardedRow(Bytes.of(QUERY_PREFIX), brow).getNodeId();
final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId);
// Read the Child Binding Set that will be exported.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 607267a..e3c5b95 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -48,8 +49,8 @@ public class StatementPatternObserver extends BindingSetUpdater {
requireNonNull(tx);
requireNonNull(row);
- // Read the Statement Pattern metadata.
- final String spNodeId = BindingSetRow.make(row).getNodeId();
+ // Make nodeId and get the Statement Pattern metadata.
+ final String spNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), row).getNodeId();
final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index d6fd8bd..83517bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -19,24 +19,23 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
import java.util.Map;
+import java.util.Set;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -44,7 +43,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
/**
@@ -56,11 +54,10 @@ public class TripleObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
+ private final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
+ private final StatementPatternIdCache SP_ID_CACHE = StatementPatternIdCacheSupplier.getOrCreateCache();
private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
- public TripleObserver() {}
-
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.TRIPLES, NotificationType.STRONG);
@@ -71,53 +68,45 @@ public class TripleObserver extends AbstractObserver {
// Get string representation of triple.
final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow);
log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement);
+ log.trace("Beginging to process triple.");
final String triple = IncUpdateDAO.getTripleString(ryaStatement);
- // Iterate over each of the Statement Patterns that are being matched against.
- final RowScanner spScanner = tx.scanner()
- .over(Span.prefix(SP_PREFIX))
-
- // Only fetch rows that have the pattern in them. There will only be a single row with a pattern per SP.
- .fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN)
- .byRow()
- .build();
+ Set<String> spIDs = SP_ID_CACHE.getStatementPatternIds(tx);
//see if triple matches conditions of any of the SP
- for (final ColumnScanner colScanner : spScanner) {
- // Get the Statement Pattern's node id.
- final String spID = colScanner.getsRow();
-
+ for (String spID: spIDs) {
// Fetch its metadata.
final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID);
+ log.trace("Retrieved metadata: {}", spMetadata);
+
// Attempt to match the triple against the pattern.
final String pattern = spMetadata.getStatementPattern();
final VariableOrder varOrder = spMetadata.getVariableOrder();
final String bindingSetString = getBindingSet(triple, pattern, varOrder);
+ log.trace("Created binding set match string: {}", bindingSetString);
+
// Statement matches to a binding set.
if(bindingSetString.length() != 0) {
// Fetch the triple's visibility label.
final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, "");
- // Create the Row ID for the emitted binding set. It does not contain visibilities.
- final String row = spID + NODEID_BS_DELIM + bindingSetString;
- final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) );
+ //Make BindingSet and sharded row
+ final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
+ visBindingSet.setVisibility(visibility);
+ Bytes row = BindingHashShardingFunction.addShard(spID, varOrder, visBindingSet);
// If this is a new Binding Set, then emit it.
- if(tx.get(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
- // Create the Binding Set that goes in the Node Value. It does contain visibilities.
- final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
- visBindingSet.setVisibility(visibility);
-
+ if(tx.get(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
try {
final Bytes valueBytes = BS_SERDE.serialize(visBindingSet);
log.trace("Transaction ID: {}\nMatched Statement Pattern: {}\nBinding Set: {}\n",
tx.getStartTimestamp(), spID, visBindingSet);
- tx.set(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
+ tx.set(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
} catch(final Exception e) {
log.error("Couldn't serialize a Binding Set. This value will be skipped.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 6ca0e8d..c30843d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -53,7 +53,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <table border="1" style="width:100%">
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
* <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
- * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*
+ * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*
* <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr>
* <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
* <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr>
@@ -109,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
- * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr>
+ * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr>
* <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
* </table>
* </p>
@@ -171,7 +171,7 @@ public class FluoQueryColumns {
public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies");
public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType");
-
+
// Query Metadata columns.
public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId");
public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars");
@@ -195,7 +195,7 @@ public class FluoQueryColumns {
public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId");
public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId");
public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet");
-
+
// Periodic Bin Metadata columns.
public static final Column PERIODIC_QUERY_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "nodeId");
public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new Column(PERIODIC_QUERY_METADATA_CF, "variableOrder");
@@ -206,7 +206,7 @@ public class FluoQueryColumns {
public static final Column PERIODIC_QUERY_WINDOWSIZE = new Column(PERIODIC_QUERY_METADATA_CF, "windowSize");
public static final Column PERIODIC_QUERY_TIMEUNIT = new Column(PERIODIC_QUERY_METADATA_CF, "timeUnit");
public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable");
-
+
// Join Metadata columns.
public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId");
public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder");
@@ -246,6 +246,13 @@ public class FluoQueryColumns {
public static final Column BATCH_COLUMN = new Column("batch","information");
/**
+ * Column indicating a set of all StatementPattern ids in the Fluo table. This is used
+ * by the Triple of Observer for finding new queries to match incoming triple to.
+ */
+ public static final Column STATEMENT_PATTERN_IDS = new Column("statementPattern", "ids");
+ public static final Column STATEMENT_PATTERN_IDS_HASH = new Column("statementPattern", "hash");
+
+ /**
* Enumerates the {@link Column}s that hold all of the fields for each type
* of node that can compose a query.
*/
@@ -261,7 +268,7 @@ public class FluoQueryColumns {
QUERY_TYPE,
QUERY_EXPORT_STRATEGIES,
QUERY_CHILD_NODE_ID)),
-
+
/**
* The columns a {@link ProjectionMetadata} object's fields are stored within.
*/
@@ -271,8 +278,8 @@ public class FluoQueryColumns {
PROJECTION_VARIABLE_ORDER,
PROJECTION_PARENT_NODE_ID,
PROJECTION_CHILD_NODE_ID)),
-
-
+
+
/**
* The columns a {@link PeriodicBinMetadata} object's fields are stored within.
*/
@@ -297,7 +304,7 @@ public class FluoQueryColumns {
CONSTRUCT_PARENT_NODE_ID,
CONSTRUCT_STATEMENTS)),
-
+
/**
* The columns a {@link FilterMetadata} object's fields are stored within.
*/
@@ -317,7 +324,7 @@ public class FluoQueryColumns {
JOIN_TYPE,
JOIN_PARENT_NODE_ID,
JOIN_LEFT_CHILD_NODE_ID,
- JOIN_BATCH_SIZE,
+ JOIN_BATCH_SIZE,
JOIN_RIGHT_CHILD_NODE_ID)),
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
index 8adc40d..b1b4076 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
@@ -18,8 +18,7 @@
*/package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.concurrent.Callable;
+import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.data.Bytes;
@@ -32,15 +31,18 @@ import com.google.common.base.Optional;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+
/**
* Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first
* checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the
- * data.
+ * data. The cache has a fixed capacity (determined at construction time), and evicts the least recently used entries
+ * when space is needed.
*
*/
public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+
private final FluoQueryMetadataDAO dao;
private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache;
private final Cache<String, Bytes> metadataCache;
@@ -49,10 +51,15 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
/**
* Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary.
- *
* @param capacity - max size of the cache
+ * @param concurrencyLevel - indicates how the cache will be partitioned to that different threads can access those
+ * partitions in a non-serialized manner
+ * @throws IllegalArgumentException if dao is null, capacity <= 0, or concurrencyLevel <= 0
*/
public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) {
+ checkNotNull(dao);
+ checkArgument(capacity > 0);
+ checkArgument(concurrencyLevel > 0);
this.dao = dao;
commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
@@ -75,166 +82,204 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
return concurrencyLevel;
}
+
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#STATEMENT_PATTERN}.
+ */
@Override
public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
-
+ checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
- return dao.readStatementPatternMetadata(tx, nodeId);
- }
+ return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
+ return dao.readStatementPatternMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#JOIN}.
+ */
@Override
public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
LOG.debug("Retrieving Metadata from Cache: {}.", nodeId);
- return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readJoinMetadata(tx, nodeId);
- }
+ return (JoinMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readJoinMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#FILTER}.
+ */
@Override
public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readFilterMetadata(tx, nodeId);
- }
+ return (FilterMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readFilterMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#PROJECTION}.
+ */
@Override
public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION);
- LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
try {
- return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readProjectionMetadata(tx, nodeId);
- }
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readProjectionMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#AGGREGATION}.
+ */
@Override
public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readAggregationMetadata(tx, nodeId);
- }
+ return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readAggregationMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#CONSTRUCT}.
+ */
@Override
public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readConstructQueryMetadata(tx, nodeId);
- }
+ return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readConstructQueryMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#PERIODIC_QUERY}.
+ */
@Override
public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readPeriodicQueryMetadata(tx, nodeId);
- }
+ return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readPeriodicQueryMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#QUERY}.
+ */
@Override
public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readQueryMetadata(tx, nodeId);
- }
+ return (QueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readQueryMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * Reads specific metadata entries from the cache. This method will retrieve the entry
+ * from the Fluo table if it does not already exist in the cache.
+ * @param tx - Transaction for interacting with Fluo
+ * @param rowId - rowId for metadata entry
+ * @param column - column of metadata entry
+ * @return - value associated with the metadata entry
+ */
public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) {
+ checkNotNull(rowId);
+ checkNotNull(tx);
+ checkNotNull(column);
Optional<NodeType> type = NodeType.fromNodeId(rowId);
+ checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
try {
- checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
- return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() {
- @Override
- public Bytes call() throws Exception {
- return tx.get(Bytes.of(rowId), column);
- }
- });
+ return metadataCache.get(getKey(rowId, column), () -> tx.get(Bytes.of(rowId), column));
} catch (Exception e) {
throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e);
}
}
+ /**
+ * Deletes contents of cache.
+ */
+ public void clear() {
+ commonNodeMetadataCache.asMap().clear();
+ metadataCache.asMap().clear();
+ }
+
private String getKey(String row, Column column) {
return row + ":" + column.getsQualifier() + ":" + column.getsQualifier();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index c132ad4..55e521e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -47,6 +47,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -617,15 +618,19 @@ public class FluoQueryMetadataDAO {
write(tx, join);
}
+ Set<String> ids = new HashSet<>();
for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) {
write(tx, statementPattern);
+ ids.add(statementPattern.getNodeId());
}
+ StatementPatternIdManager.addStatementPatternIds(tx, Sets.newHashSet(ids));
for(final AggregationMetadata aggregation : query.getAggregationMetadata()) {
write(tx, aggregation);
}
}
+
/**
* Read an instance of {@link FluoQuery} from the Fluo table.
*
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
index faab952..761100d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
@@ -1,8 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.rya.indexing.pcj.fluo.app.query;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Manages the creation of the {@link FluoQueryMetadataCache} in the Fluo application.
+ * This supplier enforces singleton like behavior in that it will only create the cache if it
+ * doesn't already exist. The FluoQueryMetadataCache is not a singleton in itself.
+ */
public class MetadataCacheSupplier {
private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class);
@@ -10,6 +35,7 @@ public class MetadataCacheSupplier {
private static boolean initialized = false;
private static final int DEFAULT_CAPACITY = 10000;
private static final int DEFAULT_CONCURRENCY = 8;
+ private static final ReentrantLock lock = new ReentrantLock();
/**
* Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the
@@ -19,21 +45,26 @@ public class MetadataCacheSupplier {
* @param concurrencyLevel - concurrencyLevel used to create a new cache
*/
public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) {
- if (!initialized) {
- LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
- concurrencyLevel);
- CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
- initialized = true;
- } else {
- LOG.debug("Cache has already been initialized. Returning cache with capacity: {} and concurrencylevel: {}",
- CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+ lock.lock();
+ try {
+ if (!initialized) {
+ LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
+ concurrencyLevel);
+ CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+ initialized = true;
+ } else {
+ LOG.warn(
+ "A cache has already been initialized, so a cache with capacity: {} and concurrency level: {} will not be created. Returning existing cache with capacity: {} and concurrencylevel: {}",
+ capacity, concurrencyLevel, CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+ }
+ return CACHE;
+ } finally {
+ lock.unlock();
}
- return CACHE;
}
/**
- * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
- * with a default size of 10000 entries and a default concurrency level of 8.
+ * Creates a FluoQueryMetadataCache with a default size of 10000 entries and a default concurrency level of 8.
*
* @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency
*/
@@ -41,4 +72,21 @@ public class MetadataCacheSupplier {
return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY);
}
+ /**
+ * Clears contents of cache and makes supplier uninitialized so that it creates a new cache.
+ * This is useful for integration tests.
+ */
+ public static void clear() {
+ lock.lock();
+ try{
+ if(initialized) {
+ CACHE.clear();
+ CACHE = null;
+ initialized = false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
new file mode 100644
index 0000000..f1ddb02
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private static Optional<String> HASH = Optional.empty();
+ private static Set<String> IDS = new HashSet<>();
+
+ /**
+ * This method retrieves the StatementPattern NodeIds registered in the Fluo table.
+ * To determine whether the StatementPattern NodeIds have changed in the underlying Fluo table,
+ * this class maintains a local hash of the ids. When this method is called, it looks up the
+ * hash of the StatementPattern Id Strings in the Fluo table, and only if it is different
+ * than the local hash will the StatementPattern nodeIds be retrieved from the Fluo table. Otherwise,
+ * this method returns a local cache of the StatementPattern nodeIds. This method is thread safe.
+ * @param tx
+ * @return - Set of StatementPattern nodeIds
+ */
+ public Set<String> getStatementPatternIds(TransactionBase tx) {
+ checkNotNull(tx);
+ Optional<Bytes> hashBytes = Optional.ofNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH));
+ if (hashBytes.isPresent()) {
+ String hash = hashBytes.get().toString();
+ if ((HASH.isPresent() && HASH.get().equals(hash))) {
+ return IDS;
+ }
+ lock.lock();
+ try {
+ String ids = tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS).toString();
+ IDS = Sets.newHashSet(ids.split(VAR_DELIM));
+ HASH = Optional.of(hash);
+ return IDS;
+ } finally {
+ lock.unlock();
+ }
+ }
+ return IDS;
+ }
+
+ /**
+ * Clears contexts of cache so that it will be re-populated next time
+ * {@link StatementPatternIdCache#getStatementPatternIds(TransactionBase)} is called.
+ */
+ public void clear() {
+ HASH = Optional.empty();
+ IDS.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
new file mode 100644
index 0000000..01264dc
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the creation of the {@link StatementPatternIdCache} in the Fluo application.
+ * This supplier enforces singleton like behavior in that it will only create the cache if it
+ * doesn't already exist. The StatementPatternIdCache is not a singleton in itself.
+ */
+public class StatementPatternIdCacheSupplier {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatementPatternIdCacheSupplier.class);
+ private static boolean initialized = false;
+ private static StatementPatternIdCache CACHE;
+ private static final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Returns an existing cache if one has been created, otherwise creates a new cache.
+ *
+ * @return - existing StatementPatternIdCache or new cache if one didn't already exist
+ */
+ public static StatementPatternIdCache getOrCreateCache() {
+ lock.lock();
+ try {
+ if (!initialized) {
+ LOG.debug("Cache has not been initialized. Initializing StatementPatternIdCache");
+ CACHE = new StatementPatternIdCache();
+ initialized = true;
+ } else {
+ LOG.debug("A StatementPatternIdCache has already been initialized.");
+ }
+ return CACHE;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Deletes stored cache and flags Supplier as uninitialized.
+ */
+ public static void clear() {
+ lock.lock();
+ try {
+ if (initialized) {
+ CACHE.clear();
+ CACHE = null;
+ initialized = false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
new file mode 100644
index 0000000..ee4c053
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+ /**
+ * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx - Fluo Transaction object for performing atomic operations on Fluo table.
+ * @param ids - ids to add to the StatementPattern nodeId Set
+ */
+ public static void addStatementPatternIds(TransactionBase tx, Set<String> ids) {
+ checkNotNull(tx);
+ checkNotNull(ids);
+ Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS));
+ StringBuilder builder = new StringBuilder();
+ if (val.isPresent()) {
+ builder.append(val.get().toString());
+ builder.append(VAR_DELIM);
+ }
+ String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+ }
+
+ /**
+ * Remove specified Set of ids from the Fluo table and updates the entry with Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that
+ * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx - Fluo Transaction object for performing atomic operations on Fluo table.
+ * @param ids - ids to remove from the StatementPattern nodeId Set
+ */
+ public static void removeStatementPatternIds(TransactionBase tx, Set<String> ids) {
+ checkNotNull(tx);
+ checkNotNull(ids);
+ Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS));
+ Set<String> storedIds = new HashSet<>();
+ if (val.isPresent()) {
+ storedIds = Sets.newHashSet(val.get().toString().split(VAR_DELIM));
+ }
+ storedIds.removeAll(ids);
+ String idString = Joiner.on(VAR_DELIM).join(ids);
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+ }
+
+}