You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/12/05 19:49:24 UTC

[3/4] incubator-rya git commit: RYA-406. Closes #251.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
index 749a77d..d56574e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
@@ -30,18 +30,18 @@ import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
 
 import com.google.common.base.Preconditions;
 
 /**
- * This class processes {@link SpanBatchDeleteInformation} objects by
- * deleting the entries in the Fluo Column corresponding to the {@link Span}
- * of the BatchInformation object.  This class will delete entries until the
- * batch size is met, and then create a new SpanBatchDeleteInformation object
- * with an updated Span whose starting point is the stopping point of this
- * batch.  If the batch limit is not met, then a new batch is not created and
- * the task is complete.
+ * This class processes {@link SpanBatchDeleteInformation} objects by deleting the entries in the Fluo Column
+ * corresponding to the {@link Span} of the BatchInformation object. This class will delete entries until the batch size
+ * is met, and then create a new SpanBatchDeleteInformation object with an updated Span whose starting point is the
+ * stopping point of this batch. If the batch limit is not met, then a new batch is not created and the task is
+ * complete.
  *
  */
 public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
@@ -49,8 +49,8 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
     private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class);
 
     /**
-     * Process SpanBatchDeleteInformation objects by deleting all entries indicated
-     * by Span until batch limit is met.
+     * Process SpanBatchDeleteInformation objects by deleting all entries indicated by Span until batch limit is met.
+     *
      * @param tx - Fluo Transaction
      * @param row - Byte row identifying BatchInformation
      * @param batch - SpanBatchDeleteInformation object to be processed
@@ -60,6 +60,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
         super.processBatch(tx, row, batch);
         Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation);
         SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch;
+        Optional<String> nodeId = spanBatch.getNodeId();
         Task task = spanBatch.getTask();
         int batchSize = spanBatch.getBatchSize();
         Span span = spanBatch.getSpan();
@@ -71,7 +72,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
             log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
             break;
         case Delete:
-            rowCol = deleteBatch(tx, span, column, batchSize);
+            rowCol = deleteBatch(tx, nodeId, span, column, batchSize);
             break;
         case Update:
             log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
@@ -90,7 +91,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
         }
     }
 
-    private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) {
+    private Optional<RowColumn> deleteBatch(TransactionBase tx, Optional<String> nodeId, Span span, Column column, int batchSize) {
 
         log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column);
         RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
@@ -100,18 +101,39 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
             int count = 0;
             boolean batchLimitMet = false;
             Bytes row = span.getStart().getRow();
+
+            //get prefix if nodeId is specified
+            Optional<Bytes> prefixBytes = Optional.empty();
+            if (nodeId.isPresent()) {
+                NodeType type = NodeType.fromNodeId(nodeId.get()).get();
+                prefixBytes = Optional.ofNullable(Bytes.of(type.getNodeTypePrefix()));
+            }
+
             while (colScannerIter.hasNext() && !batchLimitMet) {
                 ColumnScanner colScanner = colScannerIter.next();
                 row = colScanner.getRow();
-                Iterator<ColumnValue> iter = colScanner.iterator();
-                while (iter.hasNext()) {
-                    if (count >= batchSize) {
-                        batchLimitMet = true;
-                        break;
+
+                //extract the nodeId from the returned row if a nodeId was passed
+                //into the SpanBatchInformation.  This is to ensure that the returned
+                //row nodeId is equal to the nodeId passed in to the span batch information
+                Optional<String> rowNodeId = Optional.empty();
+                if (prefixBytes.isPresent()) {
+                    rowNodeId = Optional.of(BindingSetRow.makeFromShardedRow(prefixBytes.get(), row).getNodeId());
+                }
+
+                //if nodeId is present, then results returned by span are filtered
+                //on the nodeId.  This occurs when the hash is not included in the span
+                if (!rowNodeId.isPresent() || rowNodeId.equals(nodeId)) {
+                    Iterator<ColumnValue> iter = colScanner.iterator();
+                    while (iter.hasNext()) {
+                        if (count >= batchSize) {
+                            batchLimitMet = true;
+                            break;
+                        }
+                        ColumnValue colVal = iter.next();
+                        tx.delete(row, colVal.getColumn());
+                        count++;
                     }
-                    ColumnValue colVal = iter.next();
-                    tx.delete(row, colVal.getColumn());
-                    count++;
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
index 3b1e245..87158b7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
@@ -1,4 +1,8 @@
 package org.apache.rya.indexing.pcj.fluo.app.batch;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Optional;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,22 +23,38 @@ package org.apache.rya.indexing.pcj.fluo.app.batch;
  */
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 
 /**
  * This class represents a batch order to delete all entries in the Fluo table indicated
  * by the given Span and Column.  These batch orders are processed by the {@link BatchObserver},
  * which uses this batch information along with the nodeId passed into the Observer to perform
- * batch deletes.  
+ * batch deletes.
  *
  */
 public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
 
     private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater();
-    
-    public SpanBatchDeleteInformation(int batchSize, Column column, Span span) {
+    private Optional<String> nodeId;
+
+    /**
+     * Create a new SpanBatchInformation object.
+     * @param nodeId - Optional nodeId that is used to filter returned results.  Useful if the shard Id
+     * is not included in the Span (see {@link BindingHashShardingFunction} for more info about how sharded
+     * row keys are generated).
+     * @param batchSize - size of batch to be deleted
+     * @param column - column whose entries will be deleted
+     * @param span - Span indicating the range of data to delete.  Sometimes the Span cannot contain the hash
+     * (for example, if you are deleting all of the results associated with a nodeId).  In this case, a nodeId
+     * should be specified along with a Span equal to the prefix of the nodeId.
+     * @throws IllegalArgumentException if nodeId, column or span is null and if batchSize <= 0.
+     */
+    public SpanBatchDeleteInformation(Optional<String> nodeId, int batchSize, Column column, Span span) {
         super(batchSize, Task.Delete, column, span);
+        checkNotNull(nodeId);
+        this.nodeId = nodeId;
     }
-    
+
     /**
      * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column}
      */
@@ -42,17 +62,42 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
     public BatchBindingSetUpdater getBatchUpdater() {
         return updater;
     }
-    
-    
+
+    /**
+     * Returns an Optional nodeId.  If this value is specified, the results
+     * returned from the Fluo scan over the indicated range will be filtered
+     * by the nodeId.  The nodeId allows results for a given query nodeId to be
+     * deleted using a Span even if the hash cannot be specified when forming the
+     * rowId in the table.
+     * @return - the nodeId whose results will be batch deleted
+     */
+    public Optional<String> getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("Span Batch Information {\n")
+                .append("    Span: " + super.getSpan() + "\n")
+                .append("    Batch Size: " + super.getBatchSize() + "\n")
+                .append("    Task: " + super.getTask() + "\n")
+                .append("    Column: " + super.getColumn() + "\n")
+                .append("    NodeId: " + nodeId + "\n")
+                .append("}")
+                .toString();
+    }
+
     public static Builder builder() {
         return new Builder();
     }
-    
+
     public static class Builder {
 
         private int batchSize = DEFAULT_BATCH_SIZE;
         private Column column;
         private Span span;
+        private Optional<String> nodeId = Optional.empty();
 
         /**
          * @param batchSize - {@link Task}s are applied in batches of this size
@@ -74,19 +119,34 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
 
         /**
          * @param span - span that batch {@link Task} will be applied to
-         *            
+         *
          */
         public Builder setSpan(Span span) {
             this.span = span;
             return this;
         }
 
+        /**
+         * Sets the nodeId whose results will be batch deleted.  This optional value
+         * allows the {@link SpanBatchBindingSetUpdater} to filter on the indicated
+         * nodeId.  Because the results of the Fluo table are sharded, if the Span does
+         * not include the shard, then it is not possible to scan exactly for all results
+         * pertaining to a specific nodeId.  In the event that a user wants to delete all nodes
+         * related to a specific entry, this Optional nodeId should be specified to retrieve
+         * only the results associated with the indicated nodeId.
+         * @param nodeId - node whose results will be batch deleted
+         * @return - Builder for chaining method calls
+         */
+        public Builder setNodeId(Optional<String> nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
 
         /**
          * @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder
          */
         public SpanBatchDeleteInformation build() {
-            return new SpanBatchDeleteInformation(batchSize, column, span);
+            return new SpanBatchDeleteInformation(nodeId, batchSize, column, span);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
index 98deb8e..8644c31 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
@@ -1,4 +1,5 @@
 package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
  * under the License.
  */
 import java.lang.reflect.Type;
+import java.util.Optional;
 
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
@@ -37,10 +39,12 @@ import com.google.gson.JsonSerializer;
  * JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects.
  *
  */
-public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
+public class SpanBatchInformationTypeAdapter
+        implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
 
     @Override
-    public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+    public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context)
+            throws JsonParseException {
         JsonObject json = element.getAsJsonObject();
         int batchSize = json.get("batchSize").getAsInt();
         String[] colArray = json.get("column").getAsString().split("\u0000");
@@ -49,7 +53,12 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch
         boolean startInc = json.get("startInc").getAsBoolean();
         boolean endInc = json.get("endInc").getAsBoolean();
         Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
-        return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build();
+        String nodeId = json.get("nodeId").getAsString();
+        Optional<String> id = Optional.empty();
+        if (!nodeId.isEmpty()) {
+            id = Optional.of(nodeId);
+        }
+        return SpanBatchDeleteInformation.builder().setNodeId(id).setBatchSize(batchSize).setSpan(span).setColumn(column).build();
     }
 
     @Override
@@ -63,6 +72,8 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch
         result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
         result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
         result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+        String nodeId = batch.getNodeId().orElse("");
+        result.add("nodeId", new JsonPrimitive(nodeId));
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
index e33ea97..20a6b97 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.export.rya;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes;
 
 import java.util.Collection;
 import java.util.Map;
@@ -54,11 +55,11 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
     private static final Logger log = Logger.getLogger(RyaSubGraphExporter.class);
     private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
     private final FluoClient fluo;
-    
+
     public RyaSubGraphExporter(FluoClient fluo) {
         this.fluo = Preconditions.checkNotNull(fluo);
     }
-    
+
     @Override
     public Set<QueryType> getQueryTypes() {
         return Sets.newHashSet(QueryType.CONSTRUCT);
@@ -78,12 +79,12 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
     public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
         insertTriples(fluo.newTransaction(), subgraph.getStatements());
     }
-    
+
     private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
         for (final RyaStatement triple : triples) {
             Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
             try {
-                tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+                tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
             } catch (final TripleRowResolverException e) {
                 log.error("Could not convert a Triple into the SPO format: " + triple);
             }
@@ -97,10 +98,10 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
      * @return The Rya SPO representation of the triple.
      * @throws TripleRowResolverException The triple could not be converted.
      */
-    private static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+    private static Bytes spoFormat(final RyaStatement triple) throws TripleRowResolverException {
         checkNotNull(triple);
         final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
         final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
-        return spoRow.getRow();
+        return addTriplePrefixAndConvertToBytes(spoRow.getRow());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 6147fa8..a8c4d58 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -54,8 +55,8 @@ public class AggregationObserver extends BindingSetUpdater {
         requireNonNull(tx);
         requireNonNull(row);
 
-        // Fetch the Aggregation node's metadata.
-        final String nodeId = BindingSetRow.make(row).getNodeId();
+        // Make nodeId and fetch the Aggregation node's metadata.
+        final String nodeId = BindingSetRow.makeFromShardedRow(Bytes.of(AGGREGATION_PREFIX), row).getNodeId();
         final AggregationMetadata metadata = queryDao.readAggregationMetadata(tx, nodeId);
 
         // Read the Visibility Binding Set from the value.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index 09d9ede..01c9d73 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -18,6 +18,8 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
+
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -25,12 +27,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
 import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 
 /**
  * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
@@ -42,7 +44,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 public class ConstructQueryResultObserver extends AbstractObserver {
 
     private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
-    protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
+    private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
 
     @Override
     public ObservedColumn getObservedColumn() {
@@ -53,18 +55,18 @@ public class ConstructQueryResultObserver extends AbstractObserver {
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
 
         //Build row for parent that result will be written to
-        BindingSetRow bsRow = BindingSetRow.make(row);
+        BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(Bytes.of(CONSTRUCT_PREFIX), row);
         String constructNodeId = bsRow.getNodeId();
         String bsString= bsRow.getBindingSetString();
         String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
-        String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
+        Bytes rowBytes = BindingHashShardingFunction.getShardedScanPrefix(parentNodeId, bsString);
 
         //Get NodeType of the parent node
         NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
         //Get data for the ConstructQuery result
         Bytes bytes = tx.get(row, col);
         //Write result to parent
-        tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
+        tx.set(rowBytes, parentType.getResultColumn(), bytes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index b4edfea..844343c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -49,7 +50,7 @@ public class FilterObserver extends BindingSetUpdater {
         requireNonNull(row);
 
         // Read the Filter metadata.
-        final String filterNodeId = BindingSetRow.make(row).getNodeId();
+        final String filterNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(FILTER_PREFIX), row).getNodeId();
         final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId);
 
         // Read the Visibility Binding Set from the value.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index c56a98f..f3f409e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -49,7 +50,7 @@ public class JoinObserver extends BindingSetUpdater {
         requireNonNull(row);
 
         // Read the Join metadata.
-        final String joinNodeId = BindingSetRow.make(row).getNodeId();
+        final String joinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(JOIN_PREFIX), row).getNodeId();
         final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId);
 
         // Read the Visibility Binding Set from the value.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
index 7d96baa..87d0ca2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -53,7 +54,7 @@ public class PeriodicQueryObserver extends BindingSetUpdater {
         requireNonNull(row);
 
         // Read the Join metadata.
-        final String periodicBinNodeId = BindingSetRow.make(row).getNodeId();
+        final String periodicBinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PERIODIC_QUERY_PREFIX), row).getNodeId();
         final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId);
 
         // Read the Visibility Binding Set from the Value.
@@ -65,6 +66,6 @@ public class PeriodicQueryObserver extends BindingSetUpdater {
 
         return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId);
     }
-    
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
index 5d73b2e..b77bf91 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -48,7 +49,7 @@ public class ProjectionObserver extends BindingSetUpdater {
         requireNonNull(row);
 
         // Read the Filter metadata.
-        final String projectionNodeId = BindingSetRow.make(row).getNodeId();
+        final String projectionNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PROJECTION_PREFIX), row).getNodeId();
         final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId);
 
         // Read the Visibility Binding Set from the value.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 78d0ec5..7fa4d38 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -18,13 +18,14 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
@@ -50,7 +51,7 @@ import com.google.common.collect.ImmutableSet;
 public class QueryResultObserver extends AbstractObserver {
 
     private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class);
-    protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
+    private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
     /**
      * Builders for each type of {@link IncrementalBindingSetExporter} we support.
      */
@@ -97,10 +98,9 @@ public class QueryResultObserver extends AbstractObserver {
 
     @Override
     public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception {
-        final String row = brow.toString();
 
         // Read the queryId from the row and get the QueryMetadata.
-        final String queryId = row.split(NODEID_BS_DELIM)[0];
+        final String queryId = BindingSetRow.makeFromShardedRow(Bytes.of(QUERY_PREFIX), brow).getNodeId();
         final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId);
 
         // Read the Child Binding Set that will be exported.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 607267a..e3c5b95 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
@@ -48,8 +49,8 @@ public class StatementPatternObserver extends BindingSetUpdater {
         requireNonNull(tx);
         requireNonNull(row);
 
-        // Read the Statement Pattern metadata.
-        final String spNodeId = BindingSetRow.make(row).getNodeId();
+        // Make nodeId and get the Statement Pattern metadata.
+        final String spNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), row).getNodeId();
         final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId);
 
         // Read the Visibility Binding Set from the value.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index d6fd8bd..83517bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -19,24 +19,23 @@
 package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
 import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -44,7 +43,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
 
 /**
@@ -56,11 +54,10 @@ public class TripleObserver extends AbstractObserver {
     private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
-    private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
+    private final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
+    private final StatementPatternIdCache SP_ID_CACHE = StatementPatternIdCacheSupplier.getOrCreateCache();
     private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
 
-    public TripleObserver() {}
-
     @Override
     public ObservedColumn getObservedColumn() {
         return new ObservedColumn(FluoQueryColumns.TRIPLES, NotificationType.STRONG);
@@ -71,53 +68,45 @@ public class TripleObserver extends AbstractObserver {
         // Get string representation of triple.
         final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow);
         log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement);
+        log.trace("Beginging to process triple.");
 
         final String triple = IncUpdateDAO.getTripleString(ryaStatement);
 
-        // Iterate over each of the Statement Patterns that are being matched against.
-        final RowScanner spScanner = tx.scanner()
-                .over(Span.prefix(SP_PREFIX))
-
-                // Only fetch rows that have the pattern in them. There will only be a single row with a pattern per SP.
-                .fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN)
-                .byRow()
-                .build();
+        Set<String> spIDs = SP_ID_CACHE.getStatementPatternIds(tx);
 
         //see if triple matches conditions of any of the SP
-        for (final ColumnScanner colScanner : spScanner) {
-            // Get the Statement Pattern's node id.
-            final String spID = colScanner.getsRow();
-
+        for (String spID: spIDs) {
             // Fetch its metadata.
             final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID);
 
+            log.trace("Retrieved metadata: {}", spMetadata);
+
             // Attempt to match the triple against the pattern.
             final String pattern = spMetadata.getStatementPattern();
             final VariableOrder varOrder = spMetadata.getVariableOrder();
             final String bindingSetString = getBindingSet(triple, pattern, varOrder);
 
+            log.trace("Created binding set match string: {}", bindingSetString);
+
             // Statement matches to a binding set.
             if(bindingSetString.length() != 0) {
                 // Fetch the triple's visibility label.
                 final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, "");
 
-                // Create the Row ID for the emitted binding set. It does not contain visibilities.
-                final String row = spID + NODEID_BS_DELIM + bindingSetString;
-                final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) );
+                //Make BindingSet and sharded row
+                final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
+                visBindingSet.setVisibility(visibility);
+                Bytes row = BindingHashShardingFunction.addShard(spID, varOrder, visBindingSet);
 
                 // If this is a new Binding Set, then emit it.
-                if(tx.get(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
-                    // Create the Binding Set that goes in the Node Value. It does contain visibilities.
-                    final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
-                    visBindingSet.setVisibility(visibility);
-
+                if(tx.get(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
                     try {
                         final Bytes valueBytes = BS_SERDE.serialize(visBindingSet);
 
                         log.trace("Transaction ID: {}\nMatched Statement Pattern: {}\nBinding Set: {}\n",
                                 tx.getStartTimestamp(), spID, visBindingSet);
 
-                        tx.set(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
+                        tx.set(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
                     } catch(final Exception e) {
                         log.error("Couldn't serialize a Binding Set. This value will be skipped.", e);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 6ca0e8d..c30843d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -53,7 +53,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *   <table border="1" style="width:100%">
  *     <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
  *     <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
- *     <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*     
+ *     <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*
  *     <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr>
  *     <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
  *     <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr>
@@ -109,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *     <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
- *     <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr> 
+ *     <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr>
  *     <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
  *   </table>
  * </p>
@@ -171,7 +171,7 @@ public class FluoQueryColumns {
     public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
     public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies");
     public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType");
-    
+
     // Query Metadata columns.
     public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId");
     public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars");
@@ -195,7 +195,7 @@ public class FluoQueryColumns {
     public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId");
     public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId");
     public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet");
-    
+
     // Periodic Bin Metadata columns.
     public static final Column PERIODIC_QUERY_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "nodeId");
     public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new Column(PERIODIC_QUERY_METADATA_CF, "variableOrder");
@@ -206,7 +206,7 @@ public class FluoQueryColumns {
     public static final Column PERIODIC_QUERY_WINDOWSIZE = new Column(PERIODIC_QUERY_METADATA_CF, "windowSize");
     public static final Column PERIODIC_QUERY_TIMEUNIT = new Column(PERIODIC_QUERY_METADATA_CF, "timeUnit");
     public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable");
-    
+
     // Join Metadata columns.
     public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId");
     public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder");
@@ -246,6 +246,13 @@ public class FluoQueryColumns {
     public static final Column BATCH_COLUMN = new Column("batch","information");
 
     /**
+     * Column indicating a set of all StatementPattern ids in the Fluo table. This is used
+     * by the Triple of Observer for finding new queries to match incoming triple to.
+     */
+    public static final Column STATEMENT_PATTERN_IDS = new Column("statementPattern", "ids");
+    public static final Column STATEMENT_PATTERN_IDS_HASH = new Column("statementPattern", "hash");
+
+    /**
      * Enumerates the {@link Column}s that hold all of the fields for each type
      * of node that can compose a query.
      */
@@ -261,7 +268,7 @@ public class FluoQueryColumns {
                         QUERY_TYPE,
                         QUERY_EXPORT_STRATEGIES,
                         QUERY_CHILD_NODE_ID)),
-        
+
         /**
          * The columns a {@link ProjectionMetadata} object's fields are stored within.
          */
@@ -271,8 +278,8 @@ public class FluoQueryColumns {
                         PROJECTION_VARIABLE_ORDER,
                         PROJECTION_PARENT_NODE_ID,
                         PROJECTION_CHILD_NODE_ID)),
-        
-        
+
+
         /**
          * The columns a {@link PeriodicBinMetadata} object's fields are stored within.
          */
@@ -297,7 +304,7 @@ public class FluoQueryColumns {
                         CONSTRUCT_PARENT_NODE_ID,
                         CONSTRUCT_STATEMENTS)),
 
-        
+
         /**
          * The columns a {@link FilterMetadata} object's fields are stored within.
          */
@@ -317,7 +324,7 @@ public class FluoQueryColumns {
                         JOIN_TYPE,
                         JOIN_PARENT_NODE_ID,
                         JOIN_LEFT_CHILD_NODE_ID,
-                        JOIN_BATCH_SIZE, 
+                        JOIN_BATCH_SIZE,
                         JOIN_RIGHT_CHILD_NODE_ID)),
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
index 8adc40d..b1b4076 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
@@ -18,8 +18,7 @@
  */package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.concurrent.Callable;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.fluo.api.client.SnapshotBase;
 import org.apache.fluo.api.data.Bytes;
@@ -32,15 +31,18 @@ import com.google.common.base.Optional;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
+
 /**
  * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first
  * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the
- * data.
+ * data.  The cache has a fixed capacity (determined at construction time), and evicts the least recently used entries
+ * when space is needed.
  *
  */
 public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
 
     private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+
     private final FluoQueryMetadataDAO dao;
     private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache;
     private final Cache<String, Bytes> metadataCache;
@@ -49,10 +51,15 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
 
     /**
      * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary.
-     *
      * @param capacity - max size of the cache
+     * @param concurrencyLevel - indicates how the cache will be partitioned to that different threads can access those
+     * partitions in a non-serialized manner
+     * @throws IllegalArgumentException if dao is null, capacity <= 0, or concurrencyLevel <= 0
      */
     public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) {
+        checkNotNull(dao);
+        checkArgument(capacity > 0);
+        checkArgument(concurrencyLevel > 0);
         this.dao = dao;
         commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
         metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
@@ -75,166 +82,204 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
         return concurrencyLevel;
     }
 
+
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#STATEMENT_PATTERN}.
+     */
     @Override
     public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
-
+        checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
             LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
-            return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
-                    return dao.readStatementPatternMetadata(tx, nodeId);
-                }
+            return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
+                return dao.readStatementPatternMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#JOIN}.
+     */
     @Override
     public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
             LOG.debug("Retrieving Metadata from Cache: {}.", nodeId);
-            return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readJoinMetadata(tx, nodeId);
-                }
+            return (JoinMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readJoinMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#FILTER}.
+     */
     @Override
     public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
             LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
-            return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readFilterMetadata(tx, nodeId);
-                }
+            return (FilterMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readFilterMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#PROJECTION}.
+     */
     @Override
     public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
         checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION);
-        LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
         try {
-            return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readProjectionMetadata(tx, nodeId);
-                }
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readProjectionMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#AGGREGATION}.
+     */
     @Override
     public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
             LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
-            return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readAggregationMetadata(tx, nodeId);
-                }
+            return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readAggregationMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#CONSTRUCT}.
+     */
     @Override
     public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
             LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
-            return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readConstructQueryMetadata(tx, nodeId);
-                }
+            return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readConstructQueryMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#PERIODIC_QUERY}.
+     */
     @Override
     public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
             LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
-            return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readPeriodicQueryMetadata(tx, nodeId);
-                }
+            return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readPeriodicQueryMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+     * does not return an Optional containing {@link NodeType#QUERY}.
+     */
     @Override
     public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) {
+        checkNotNull(nodeId);
+        checkNotNull(tx);
         Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
         try {
-            checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
             LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
-            return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
-                @Override
-                public CommonNodeMetadata call() throws Exception {
-                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
-                    return dao.readQueryMetadata(tx, nodeId);
-                }
+            return (QueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+                LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                return dao.readQueryMetadata(tx, nodeId);
             });
         } catch (Exception e) {
             throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e);
         }
     }
 
+    /**
+     * Reads specific metadata entries from the cache.  This method will retrieve the entry
+     * from the Fluo table if it does not already exist in the cache.
+     * @param tx - Transaction for interacting with Fluo
+     * @param rowId - rowId for metadata entry
+     * @param column - column of metadata entry
+     * @return - value associated with the metadata entry
+     */
     public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) {
+        checkNotNull(rowId);
+        checkNotNull(tx);
+        checkNotNull(column);
         Optional<NodeType> type = NodeType.fromNodeId(rowId);
+        checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
         try {
-            checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
-            return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() {
-                @Override
-                public Bytes call() throws Exception {
-                    return tx.get(Bytes.of(rowId), column);
-                }
-            });
+            return metadataCache.get(getKey(rowId, column), () -> tx.get(Bytes.of(rowId), column));
         } catch (Exception e) {
             throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e);
         }
     }
 
+    /**
+     * Deletes contents of cache.
+     */
+    public void clear() {
+        commonNodeMetadataCache.asMap().clear();
+        metadataCache.asMap().clear();
+    }
+
     private String getKey(String row, Column column) {
         return row + ":" + column.getsQualifier() + ":" + column.getsQualifier();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index c132ad4..55e521e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -47,6 +47,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -617,15 +618,19 @@ public class FluoQueryMetadataDAO {
             write(tx, join);
         }
 
+        Set<String> ids = new HashSet<>();
         for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) {
             write(tx, statementPattern);
+            ids.add(statementPattern.getNodeId());
         }
+        StatementPatternIdManager.addStatementPatternIds(tx, Sets.newHashSet(ids));
 
         for(final AggregationMetadata aggregation : query.getAggregationMetadata()) {
             write(tx, aggregation);
         }
     }
 
+
     /**
      * Read an instance of {@link FluoQuery} from the Fluo table.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
index faab952..761100d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
@@ -1,8 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ *  Manages the creation of the {@link FluoQueryMetadataCache} in the Fluo application.
+ *  This supplier enforces singleton like behavior in that it will only create the cache if it
+ *  doesn't already exist.  The FluoQueryMetadataCache is not a singleton in itself.
+ */
 public class MetadataCacheSupplier {
 
     private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class);
@@ -10,6 +35,7 @@ public class MetadataCacheSupplier {
     private static boolean initialized = false;
     private static final int DEFAULT_CAPACITY = 10000;
     private static final int DEFAULT_CONCURRENCY = 8;
+    private static final ReentrantLock lock = new ReentrantLock();
 
     /**
      * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the
@@ -19,21 +45,26 @@ public class MetadataCacheSupplier {
      * @param concurrencyLevel - concurrencyLevel used to create a new cache
      */
     public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) {
-        if (!initialized) {
-            LOG.debug("Cache has not been initialized.  Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
-                    concurrencyLevel);
-            CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
-            initialized = true;
-        } else {
-            LOG.debug("Cache has already been initialized.  Returning cache with capacity: {} and concurrencylevel: {}",
-                    CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+        lock.lock();
+        try {
+            if (!initialized) {
+                LOG.debug("Cache has not been initialized.  Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
+                        concurrencyLevel);
+                CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+                initialized = true;
+            } else {
+                LOG.warn(
+                        "A cache has already been initialized, so a cache with capacity: {} and concurrency level: {} will not be created.  Returning existing cache with capacity: {} and concurrencylevel: {}",
+                        capacity, concurrencyLevel, CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+            }
+            return CACHE;
+        } finally {
+            lock.unlock();
         }
-        return CACHE;
     }
 
     /**
-     * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
-     * with a default size of 10000 entries and a default concurrency level of 8.
+     * Creates a FluoQueryMetadataCache with a default size of 10000 entries and a default concurrency level of 8.
      *
      * @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency
      */
@@ -41,4 +72,21 @@ public class MetadataCacheSupplier {
         return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY);
     }
 
+    /**
+     * Clears contents of cache and makes supplier uninitialized so that it creates a new cache.
+     * This is useful for integration tests.
+     */
+    public static void clear() {
+        lock.lock();
+        try{
+            if(initialized) {
+                CACHE.clear();
+                CACHE = null;
+                initialized = false;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
new file mode 100644
index 0000000..f1ddb02
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private static Optional<String> HASH = Optional.empty();
+    private static Set<String> IDS = new HashSet<>();
+
+    /**
+     * This method retrieves the StatementPattern NodeIds registered in the Fluo table.
+     * To determine whether the StatementPattern NodeIds have changed in the underlying Fluo table,
+     * this class maintains a local hash of the ids.  When this method is called, it looks up the
+     * hash of the StatementPattern Id Strings in the Fluo table, and only if it is different
+     * than the local hash will the StatementPattern nodeIds be retrieved from the Fluo table.  Otherwise,
+     * this method returns a local cache of the StatementPattern nodeIds.  This method is thread safe.
+     * @param tx
+     * @return - Set of StatementPattern nodeIds
+     */
+    public Set<String> getStatementPatternIds(TransactionBase tx) {
+        checkNotNull(tx);
+        Optional<Bytes> hashBytes = Optional.ofNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH));
+        if (hashBytes.isPresent()) {
+            String hash = hashBytes.get().toString();
+            if ((HASH.isPresent() && HASH.get().equals(hash))) {
+                return IDS;
+            }
+            lock.lock();
+            try {
+                String ids = tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS).toString();
+                IDS = Sets.newHashSet(ids.split(VAR_DELIM));
+                HASH = Optional.of(hash);
+                return IDS;
+            } finally {
+                lock.unlock();
+            }
+        }
+        return IDS;
+    }
+
+    /**
+     * Clears contexts of cache so that it will be re-populated next time
+     * {@link StatementPatternIdCache#getStatementPatternIds(TransactionBase)} is called.
+     */
+    public void clear() {
+        HASH = Optional.empty();
+        IDS.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
new file mode 100644
index 0000000..01264dc
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Manages the creation of the {@link StatementPatternIdCache} in the Fluo application.
+ *  This supplier enforces singleton like behavior in that it will only create the cache if it
+ *  doesn't already exist.  The StatementPatternIdCache is not a singleton in itself.
+ */
+public class StatementPatternIdCacheSupplier {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StatementPatternIdCacheSupplier.class);
+    private static boolean initialized = false;
+    private static StatementPatternIdCache CACHE;
+    private static final ReentrantLock lock = new ReentrantLock();
+
+    /**
+     * Returns an existing cache if one has been created, otherwise creates a new cache.
+     *
+     * @return - existing StatementPatternIdCache or new cache if one didn't already exist
+     */
+    public static StatementPatternIdCache getOrCreateCache() {
+        lock.lock();
+        try {
+            if (!initialized) {
+                LOG.debug("Cache has not been initialized.  Initializing StatementPatternIdCache");
+                CACHE = new StatementPatternIdCache();
+                initialized = true;
+            } else {
+                LOG.debug("A StatementPatternIdCache has already been initialized.");
+            }
+            return CACHE;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Deletes stored cache and flags Supplier as uninitialized.
+     */
+    public static void clear() {
+        lock.lock();
+        try {
+            if (initialized) {
+                CACHE.clear();
+                CACHE = null;
+                initialized = false;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
new file mode 100644
index 0000000..ee4c053
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+    /**
+     * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+     * updates the hash of the updated nodeId Set and writes that to the Column
+     * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+     *
+     * @param tx - Fluo Transaction object for performing atomic operations on Fluo table.
+     * @param ids - ids to add to the StatementPattern nodeId Set
+     */
+    public static void addStatementPatternIds(TransactionBase tx, Set<String> ids) {
+        checkNotNull(tx);
+        checkNotNull(ids);
+        Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS));
+        StringBuilder builder = new StringBuilder();
+        if (val.isPresent()) {
+            builder.append(val.get().toString());
+            builder.append(VAR_DELIM);
+        }
+        String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
+        tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
+        tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+    }
+
+    /**
+     * Remove specified Set of ids from the Fluo table and updates the entry with Column
+     * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that
+     * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+     *
+     * @param tx - Fluo Transaction object for performing atomic operations on Fluo table.
+     * @param ids - ids to remove from the StatementPattern nodeId Set
+     */
+    public static void removeStatementPatternIds(TransactionBase tx, Set<String> ids) {
+        checkNotNull(tx);
+        checkNotNull(ids);
+        Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS));
+        Set<String> storedIds = new HashSet<>();
+        if (val.isPresent()) {
+            storedIds = Sets.newHashSet(val.get().toString().split(VAR_DELIM));
+        }
+        storedIds.removeAll(ids);
+        String idString = Joiner.on(VAR_DELIM).join(ids);
+        tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
+        tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+    }
+
+}