You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/05/13 13:13:12 UTC

[2/3] incubator-rya git commit: RYA-53 Added visibility support

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index bc558a5..700d0fb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -37,6 +37,7 @@ import org.openrdf.query.Binding;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -50,10 +51,11 @@ import io.fluo.api.iterator.ColumnIterator;
 import io.fluo.api.iterator.RowIterator;
 import io.fluo.api.types.Encoder;
 import io.fluo.api.types.StringEncoder;
-import mvm.rya.indexing.external.tupleSet.BindingSetConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * Updates the results of a Join node when one of its children has added a
@@ -62,7 +64,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 @ParametersAreNonnullByDefault
 public class JoinResultUpdater {
 
-    private static final BindingSetConverter<String> converter = new BindingSetStringConverter();
+    private static final BindingSetStringConverter idConverter = new BindingSetStringConverter();
+    private static final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
     private final Encoder encoder = new StringEncoder();
@@ -80,7 +83,7 @@ public class JoinResultUpdater {
     public void updateJoinResults(
             final TransactionBase tx,
             final String childId,
-            final BindingSet childBindingSet,
+            final VisibilityBindingSet childBindingSet,
             final JoinMetadata joinMetadata) throws BindingSetConversionException {
         checkNotNull(tx);
         checkNotNull(childId);
@@ -113,10 +116,10 @@ public class JoinResultUpdater {
         }
 
         // Iterates over the sibling node's BindingSets that join with the new binding set.
-        FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
+        final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
 
         // Iterates over the resulting BindingSets from the join.
-        final Iterator<BindingSet> newJoinResults;
+        final Iterator<VisibilityBindingSet> newJoinResults;
         if(emittingSide == Side.LEFT) {
             newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets);
         } else {
@@ -124,14 +127,15 @@ public class JoinResultUpdater {
         }
 
         // Insert the new join binding sets to the Fluo table.
-        VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+        final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
         while(newJoinResults.hasNext()) {
-            BindingSet newJoinResult = newJoinResults.next();
-            String joinBindingSetString = converter.convert(newJoinResult, joinVarOrder);
+            final BindingSet newJoinResult = newJoinResults.next();
+            final String joinBindingSetStringId = idConverter.convert(newJoinResult, joinVarOrder);
+            final String joinBindingSetStringValue = valueConverter.convert(newJoinResult, joinVarOrder);
 
-            final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString);
+            final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId);
             final Column col = FluoQueryColumns.JOIN_BINDING_SET;
-            final Bytes value = encoder.encode(joinBindingSetString);
+            final Bytes value = encoder.encode(joinBindingSetStringValue);
             tx.set(row, col, value);
         }
     }
@@ -143,14 +147,14 @@ public class JoinResultUpdater {
         LEFT, RIGHT;
     }
 
-    private FluoTableIterator makeSiblingScanIterator(String childId, BindingSet childBindingSet, String siblingId, TransactionBase tx) throws BindingSetConversionException {
+    private FluoTableIterator makeSiblingScanIterator(final String childId, final BindingSet childBindingSet, final String siblingId, final TransactionBase tx) throws BindingSetConversionException {
         // Get the common variable orders. These are used to build the prefix.
         final VariableOrder childVarOrder = getVarOrder(tx, childId);
         final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
-        List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
+        final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
 
         // Get the Binding strings
-        final String childBindingSetString = converter.convert(childBindingSet, childVarOrder);
+        final String childBindingSetString = valueConverter.convert(childBindingSet, childVarOrder);
         final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString);
 
         // Create the prefix that will be used to scan for binding sets of the sibling node.
@@ -285,20 +289,20 @@ public class JoinResultUpdater {
     public static interface IterativeJoin {
 
         /**
-         * Invoked when a new {@link BindingSet} is emitted from the left child
+         * Invoked when a new {@link VisibilityBindingSet} is emitted from the left child
          * node of the join. The Fluo table is scanned for results on the right
          * side that will be joined with the new result.
          *
-         * @param newLeftResult - A new BindingSet that has been emitted from
+         * @param newLeftResult - A new VisibilityBindingSet that has been emitted from
          *   the left child node.
          * @param rightResults - The right child node's binding sets that will
          *   be joined with the new left result. (not null)
          * @return The new BindingSet results for the join.
          */
-        public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults);
+        public Iterator<VisibilityBindingSet> newLeftResult(VisibilityBindingSet newLeftResult, Iterator<VisibilityBindingSet> rightResults);
 
         /**
-         * Invoked when a new {@link BindingSet} is emitted from the right child
+         * Invoked when a new {@link VisibilityBindingSet} is emitted from the right child
          * node of the join. The Fluo table is scanned for results on the left
          * side that will be joined with the new result.
          *
@@ -308,7 +312,7 @@ public class JoinResultUpdater {
          *   the right child node.
          * @return The new BindingSet results for the join.
          */
-        public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult);
+        public Iterator<VisibilityBindingSet> newRightResult(Iterator<VisibilityBindingSet> leftResults, VisibilityBindingSet newRightResult);
     }
 
     /**
@@ -321,7 +325,7 @@ public class JoinResultUpdater {
      */
     public static final class NaturalJoin implements IterativeJoin {
         @Override
-        public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) {
+        public Iterator<VisibilityBindingSet> newLeftResult(final VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> rightResults) {
             checkNotNull(newLeftResult);
             checkNotNull(rightResults);
 
@@ -330,7 +334,7 @@ public class JoinResultUpdater {
         }
 
         @Override
-        public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult) {
+        public Iterator<VisibilityBindingSet> newRightResult(final Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet newRightResult) {
             checkNotNull(leftResults);
             checkNotNull(newRightResult);
 
@@ -349,14 +353,14 @@ public class JoinResultUpdater {
      */
     public static final class LeftOuterJoin implements IterativeJoin {
         @Override
-        public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) {
+        public Iterator<VisibilityBindingSet> newLeftResult(final VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> rightResults) {
             checkNotNull(newLeftResult);
             checkNotNull(rightResults);
 
             // If the required portion does not join with any optional portions,
             // then emit a BindingSet that matches the new left result.
             if(!rightResults.hasNext()) {
-                return Lists.<BindingSet>newArrayList(newLeftResult).iterator();
+                return Lists.<VisibilityBindingSet>newArrayList(newLeftResult).iterator();
             }
 
             // Otherwise, return an iterator that holds the new required result
@@ -365,7 +369,7 @@ public class JoinResultUpdater {
         }
 
         @Override
-        public Iterator<BindingSet> newRightResult(final Iterator<BindingSet> leftResults, final BindingSet newRightResult) {
+        public Iterator<VisibilityBindingSet> newRightResult(final Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet newRightResult) {
             checkNotNull(leftResults);
             checkNotNull(newRightResult);
 
@@ -382,10 +386,10 @@ public class JoinResultUpdater {
      * This is done lazily so that you don't have to load all of the BindingSets
      * into memory at once.
      */
-    private static final class LazyJoiningIterator implements Iterator<BindingSet> {
+    private static final class LazyJoiningIterator implements Iterator<VisibilityBindingSet> {
 
-        private final BindingSet newResult;
-        private final Iterator<BindingSet> joinedResults;
+        private final VisibilityBindingSet newResult;
+        private final Iterator<VisibilityBindingSet> joinedResults;
 
         /**
          * Constructs an instance of {@link LazyJoiningIterator}.
@@ -393,9 +397,9 @@ public class JoinResultUpdater {
          * @param newResult - A binding set that will be joined with some other binding sets. (not null)
          * @param joinResults - The binding sets that will be joined with {@code newResult}. (not null)
          */
-        public LazyJoiningIterator(BindingSet newResult, Iterator<BindingSet> joinResults) {
+        public LazyJoiningIterator(final VisibilityBindingSet newResult, final Iterator<VisibilityBindingSet> joinResults) {
             this.newResult = checkNotNull(newResult);
-            this.joinedResults = checkNotNull(joinResults);
+            joinedResults = checkNotNull(joinResults);
         }
 
         @Override
@@ -404,18 +408,28 @@ public class JoinResultUpdater {
         }
 
         @Override
-        public BindingSet next() {
+        public VisibilityBindingSet next() {
             final MapBindingSet bs = new MapBindingSet();
 
-            for(Binding binding : newResult) {
+            for(final Binding binding : newResult) {
                 bs.addBinding(binding);
             }
 
-            for(Binding binding : joinedResults.next()) {
+            final VisibilityBindingSet joinResult = joinedResults.next();
+            for(final Binding binding : joinResult) {
                 bs.addBinding(binding);
             }
 
-            return bs;
+            String visibility = "";
+            final Joiner join = Joiner.on(")&(");
+            final String leftVisi = newResult.getVisibility();
+            final String rightVisi = joinResult.getVisibility();
+            if(leftVisi.isEmpty() || rightVisi.isEmpty()) {
+                visibility = (leftVisi + rightVisi).trim();
+            } else {
+                visibility = "(" + join.join(leftVisi, rightVisi) + ")";
+            }
+            return new VisibilityBindingSet(bs, visibility);
         }
 
         @Override
@@ -428,7 +442,7 @@ public class JoinResultUpdater {
      * Iterates over rows that have a Binding Set column and returns the unmarshalled
      * {@link BindingSet}s.
      */
-    private static final class FluoTableIterator implements Iterator<BindingSet> {
+    private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> {
 
         private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
                 FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
@@ -445,7 +459,7 @@ public class JoinResultUpdater {
          * @param varOrder - The Variable Order of binding sets that will be
          *   read from the Fluo Table. (not null)
          */
-        public FluoTableIterator(RowIterator rows, VariableOrder varOrder) {
+        public FluoTableIterator(final RowIterator rows, final VariableOrder varOrder) {
             this.rows = checkNotNull(rows);
             this.varOrder = checkNotNull(varOrder);
         }
@@ -456,7 +470,7 @@ public class JoinResultUpdater {
         }
 
         @Override
-        public BindingSet next() {
+        public VisibilityBindingSet next() {
             final ColumnIterator columns = rows.next().getValue();
 
             while(columns.hasNext()) {
@@ -464,11 +478,7 @@ public class JoinResultUpdater {
                 final Entry<Column, Bytes> entry = columns.next();
                 if(BINDING_SET_COLUMNS.contains(entry.getKey())) {
                     final String bindingSetString = entry.getValue().toString();
-                    try {
-                        return converter.convert(bindingSetString, varOrder);
-                    } catch (BindingSetConversionException e) {
-                        throw new RuntimeException("Could not convert one of the stored BindingSets from a String: " + bindingSetString, e);
-                    }
+                    return (VisibilityBindingSet) valueConverter.convert(bindingSetString, varOrder);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index f3ff089..8e0a6fe 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -26,7 +26,6 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
 
 import io.fluo.api.client.TransactionBase;
@@ -34,8 +33,10 @@ import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
 import io.fluo.api.types.Encoder;
 import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * Updates the results of a Query node when one of its children has added a
@@ -43,10 +44,10 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 @ParametersAreNonnullByDefault
 public class QueryResultUpdater {
-
     private final Encoder encoder = new StringEncoder();
 
     private final BindingSetStringConverter converter = new BindingSetStringConverter();
+    private final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter();
 
     /**
      * Updates the results of a Query node when one of its children has added a
@@ -58,7 +59,7 @@ public class QueryResultUpdater {
      */
     public void updateQueryResults(
             final TransactionBase tx,
-            final BindingSet childBindingSet,
+            final VisibilityBindingSet childBindingSet,
             final QueryMetadata queryMetadata) {
         checkNotNull(tx);
         checkNotNull(childBindingSet);
@@ -75,11 +76,12 @@ public class QueryResultUpdater {
             }
         }
         final String queryBindingSetString = converter.convert(queryBindingSet, queryVarOrder);
+        final String queryBindingSetValueString = valueConverter.convert(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), queryVarOrder);
 
         // Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry.
         final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString);
         final Column col = FluoQueryColumns.QUERY_BINDING_SET;
-        final Bytes value = encoder.encode(queryBindingSetString);
+        final Bytes value = encoder.encode(queryBindingSetValueString);
         tx.set(row, col, value);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
index fbbae33..c2c031c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -20,9 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.app.export;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.openrdf.query.BindingSet;
-
 import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 
 /**
  * Exports a single Binding Set that is a new result for a SPARQL query to some
@@ -40,7 +39,7 @@ public interface IncrementalResultExporter {
      *   Fluo application. (not null)
      * @throws ResultExportException The result could not be exported.
      */
-    public void export(TypedTransactionBase tx, String queryId, BindingSet result) throws ResultExportException;
+    public void export(TypedTransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
 
     /**
      * A result could not be exported.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
index 4d51798..5c8c719 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
@@ -25,10 +25,10 @@ import java.util.Collections;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.openrdf.query.BindingSet;
 
 import io.fluo.api.data.Bytes;
 import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.PcjTables;
 import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
 
@@ -55,7 +55,7 @@ public class RyaResultExporter implements IncrementalResultExporter {
     public void export(
             final TypedTransactionBase fluoTx,
             final String queryId,
-            final BindingSet result) throws ResultExportException {
+            final VisibilityBindingSet result) throws ResultExportException {
         checkNotNull(fluoTx);
         checkNotNull(queryId);
         checkNotNull(result);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index aa944e4..9bd0148 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -31,13 +31,15 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
 import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
 import io.fluo.api.types.TypedObserver;
 import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 
 /**
@@ -48,6 +50,7 @@ import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversi
 @ParametersAreNonnullByDefault
 public abstract class BindingSetUpdater extends TypedObserver {
 
+    private final Encoder encoder = new StringEncoder();
     // DAO
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -74,9 +77,10 @@ public abstract class BindingSetUpdater extends TypedObserver {
         checkNotNull(row);
         checkNotNull(col);
 
-        final Observation observation = parseObservation( tx, BindingSetRow.make(row) );
+        final String bindingSetString = encoder.decodeString(tx.get(row, col));
+        final Observation observation = parseObservation( tx, new BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) );
         final String observedNodeId = observation.getObservedNodeId();
-        final BindingSet observedBindingSet = observation.getObservedBindingSet();
+        final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet();
         final String parentNodeId = observation.getParentId();
 
         // Figure out which node needs to handle the new metadata.
@@ -100,7 +104,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
                 final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId);
                 try {
                     joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin);
-                } catch (BindingSetConversionException e) {
+                } catch (final BindingSetConversionException e) {
                     throw new RuntimeException("Could not process a Join node.", e);
                 }
                 break;
@@ -117,7 +121,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
     public static final class Observation {
 
         private final String observedNodeId;
-        private final BindingSet observedBindingSet;
+        private final VisibilityBindingSet observedBindingSet;
         private final String parentNodeId;
 
         /**
@@ -129,7 +133,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
          */
         public Observation(
                 final String observedNodeId,
-                final BindingSet observedBindingSet,
+                final VisibilityBindingSet observedBindingSet,
                 final String parentNodeId) {
             this.observedNodeId = checkNotNull(observedNodeId);
             this.observedBindingSet = checkNotNull(observedBindingSet);
@@ -146,7 +150,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
         /**
          * @return A Binding Set that was just emitted.
          */
-        public BindingSet getObservedBindingSet() {
+        public VisibilityBindingSet getObservedBindingSet() {
             return observedBindingSet;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index 2accde3..fb15934 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * Notified when the results of a Filter have been updated to include a new
@@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 public class FilterObserver extends BindingSetUpdater {
 
-    private final BindingSetStringConverter converter = new BindingSetStringConverter();
+    private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -57,7 +58,7 @@ public class FilterObserver extends BindingSetUpdater {
 
         // Read the Binding Set that was just emmitted by the Filter.
         final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
-        final BindingSet filterBindingSet = converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
+        final VisibilityBindingSet filterBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = filterMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 43b0a4e..a8cd0df 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * Notified when the results of a Join have been updated to include a new
@@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 public class JoinObserver extends BindingSetUpdater {
 
-    private final BindingSetStringConverter converter = new BindingSetStringConverter();
+    private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter();
 
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -56,7 +57,7 @@ public class JoinObserver extends BindingSetUpdater {
 
         // Read the Binding Set that was just emmitted by the Join.
         final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
-        final BindingSet joinBindingSet = converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
+        final VisibilityBindingSet joinBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = joinMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 7c1a588..fe4dc56 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -29,17 +29,19 @@ import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.openrdf.query.BindingSet;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
 import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
 import io.fluo.api.types.TypedObserver;
 import io.fluo.api.types.TypedTransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * Performs incremental result exporting to the configured destinations.
@@ -47,9 +49,9 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 public class QueryResultObserver extends TypedObserver {
     private static final Logger log = Logger.getLogger(QueryResultObserver.class);
 
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
-    private final BindingSetStringConverter converter = new BindingSetStringConverter();
+    private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO();
+    private static final Encoder ENCODER = new StringEncoder();
+    private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
 
     /**
      * Builders for each type of result exporter we support.
@@ -93,16 +95,16 @@ public class QueryResultObserver extends TypedObserver {
     @Override
     public void process(final TypedTransactionBase tx, final Bytes row, final Column col) {
         // Read the SPARQL query and it Binding Set from the row id.
-        final String[] queryAndBindingSet = row.toString().split(NODEID_BS_DELIM);
+        final String[] queryAndBindingSet = ENCODER.decodeString(row).split(NODEID_BS_DELIM);
         final String queryId = queryAndBindingSet[0];
-        final String bindingSetString = queryAndBindingSet[1];
+        final String bindingSetString = ENCODER.decodeString(tx.get(row, col));
 
         // Fetch the query's Variable Order from the Fluo table.
-        final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, queryId);
+        final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId);
         final VariableOrder varOrder = queryMetadata.getVariableOrder();
 
         // Export the result using each of the provided exporters.
-        BindingSet result = converter.convert(bindingSetString, varOrder);
+        final VisibilityBindingSet result = (VisibilityBindingSet) CONVERTER.convert(bindingSetString, varOrder);
         for(final IncrementalResultExporter exporter : exporters) {
             try {
                 exporter.export(tx, queryId, result);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index ddba9a2..7b1e510 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.openrdf.query.BindingSet;
 
 import io.fluo.api.client.TransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
 import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * Notified when the results of a Statement Pattern have been updated to include
@@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
  */
 public class StatementPatternObserver extends BindingSetUpdater {
 
-    private final BindingSetStringConverter converter = new BindingSetStringConverter();
+    private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
 
     // DAO
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -54,10 +55,11 @@ public class StatementPatternObserver extends BindingSetUpdater {
         // Read the Statement Pattern metadata.
         final String spNodeId = parsedRow.getNodeId();
         final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId);
+        final String bindingSetValue = parsedRow.getBindingSetString();
 
         // Read the Binding Set that was just emmitted by the Statement Pattern.
         final VariableOrder spVarOrder = spMetadata.getVariableOrder();
-        final BindingSet spBindingSet = converter.convert(parsedRow.getBindingSetString(), spVarOrder);
+        final VisibilityBindingSet spBindingSet = (VisibilityBindingSet) CONVERTER.convert(bindingSetValue, spVarOrder);
 
         // Figure out which node needs to handle the new metadata.
         final String parentNodeId = spMetadata.getParentNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index d43ffc9..496c0ed 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -40,8 +40,13 @@ import io.fluo.api.data.Column;
 import io.fluo.api.data.Span;
 import io.fluo.api.iterator.ColumnIterator;
 import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
 import io.fluo.api.types.TypedObserver;
 import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
 
 /**
  * An observer that matches new Triples to the Statement Patterns that are part
@@ -50,7 +55,9 @@ import io.fluo.api.types.TypedTransactionBase;
  */
 public class TripleObserver extends TypedObserver {
 
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+    private static final Encoder ENCODER = new StringEncoder();
+    private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO();
+    private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
 
     public TripleObserver() {
         super(new StringTypeLayer());
@@ -65,6 +72,11 @@ public class TripleObserver extends TypedObserver {
     public void process(final TypedTransactionBase tx, final Bytes row, final Column column) {
         //get string representation of triple
         final String triple = IncUpdateDAO.getTripleString(row);
+        final Bytes visiBytes = tx.get(row, FluoQueryColumns.TRIPLES);
+        String visibility = "";
+        if(visiBytes != null) {
+             visibility = ENCODER.decodeString(visiBytes);
+        }
 
         //get variable metadata for all SP in table
         final ScannerConfiguration sc1 = new ScannerConfiguration();
@@ -75,19 +87,25 @@ public class TripleObserver extends TypedObserver {
         final RowIterator ri = tx.get(sc1);
 
         while(ri.hasNext()) {
-
             final Entry<Bytes, ColumnIterator> next = ri.next();
             final ColumnIterator ci = next.getValue();
             final String spID = next.getKey().toString();
 
-            final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spID);
+            final StatementPatternMetadata spMetadata = QUERY_DAO.readStatementPatternMetadata(tx, spID);
             final String pattern = spMetadata.getStatementPattern();
 
             while(ci.hasNext()) {
                 final String varOrders = ci.next().getValue().toString();
-                final String bindingSet = getBindingSet(triple, pattern, varOrders);
-                if(bindingSet.length() != 0) {
-                    tx.mutate().row(spID + NODEID_BS_DELIM + bindingSet).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(bindingSet);
+                final VariableOrder varOrder = new VariableOrder(varOrders);
+                final String bindingSetString = getBindingSet(triple, pattern, varOrders);
+
+                //Statement matches to a binding set
+                if(bindingSetString.length() != 0) {
+                    final VisibilityBindingSet bindingSet = new VisibilityBindingSet(
+                        CONVERTER.convert(bindingSetString, varOrder),
+                        visibility);
+                    final String valueString = CONVERTER.convert(bindingSet, varOrder);
+                    tx.mutate().row(spID + NODEID_BS_DELIM + bindingSetString).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(valueString);
                 }
             }
         }
@@ -96,8 +114,16 @@ public class TripleObserver extends TypedObserver {
         tx.delete(row, column);
     }
 
-    //determines whether triple matches SPID conditions and generates bindingset
-    //whose order is determined by varOrder
+    /**
+     * Determines whether triple matches Statement Pattern ID conditions if
+     * so, generates a string representation of a BindingSet whose order
+     * is determined by varOrder.
+     * @param triple - The triple to consider.
+     * @param spID - The statement pattern ID
+     * @param varOrder - The variable order
+     * @return The string representation of the BindingSet or an empty string,
+     * signifying the triple did not match the statement pattern ID.
+     */
     private static String getBindingSet(final String triple, final String spID, final String varOrder) {
         final String[] spIdArray = spID.split(DELIM);
         final String[] tripleArray = triple.split(DELIM);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index fa25456..be24ac9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -88,7 +88,7 @@ public class FluoQueryColumns {
     * <p>
     * <table border="1" style="width:100%">
     *   <tr> <th>Fluo Row</td>  <th>Fluo Column</td>  <th>Fluo Value</td> </tr>
-    *   <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>empty</td> </tr>
+    *   <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>visibility</td> </tr>
     * </table>
     */
    public static final Column TRIPLES = new Column("triples", "SPO");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 265ca0f..83985da 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -205,8 +205,8 @@ public class FluoQueryMetadataDAO {
         final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER));
         final VariableOrder varOrder = new VariableOrder(varOrderString);
 
-        String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) );
-        JoinType joinType = JoinType.valueOf(joinTypeString);
+        final String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) );
+        final JoinType joinType = JoinType.valueOf(joinTypeString);
 
         final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) );
         final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
index 025c3e7..08d5cef 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
@@ -37,6 +37,8 @@ import org.openrdf.query.impl.MapBindingSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+
 /**
  * Tests the methods of {@link LeftOuterJoin}.
  */
@@ -46,128 +48,134 @@ public class LeftOuterJoinTest {
 
     @Test
     public void newLeftResult_noRightMatches() {
-        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
 
         // There is a new left result.
-        MapBindingSet newLeftResult = new MapBindingSet();
-        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+        final MapBindingSet mapLeftResult = new MapBindingSet();
+        mapLeftResult.addBinding("name", vf.createLiteral("Bob"));
+        final VisibilityBindingSet newLeftResult = new VisibilityBindingSet(mapLeftResult);
 
         // There are no right results that join with the left result.
-        Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator();
+        final Iterator<VisibilityBindingSet> rightResults= new ArrayList<VisibilityBindingSet>().iterator();
 
         // Therefore, the left result is a new join result.
-        Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
 
-        Set<BindingSet> newJoinResults = new HashSet<>();
+        final Set<BindingSet> newJoinResults = new HashSet<>();
         while(newJoinResultsIt.hasNext()) {
             newJoinResults.add( newJoinResultsIt.next() );
         }
 
-        Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult );
+        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult );
 
         assertEquals(expected, newJoinResults);
     }
 
     @Test
     public void newLeftResult_joinsWithRightResults() {
-        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
 
         // There is a new left result.
-        MapBindingSet newLeftResult = new MapBindingSet();
-        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
-        newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+        final MapBindingSet mapLeftResult = new MapBindingSet();
+        mapLeftResult.addBinding("name", vf.createLiteral("Bob"));
+        mapLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+        final VisibilityBindingSet newLeftResult = new VisibilityBindingSet(mapLeftResult);
 
         // There are a few right results that join with the left result.
-        MapBindingSet nameAge = new MapBindingSet();
+        final MapBindingSet nameAge = new MapBindingSet();
         nameAge.addBinding("name", vf.createLiteral("Bob"));
         nameAge.addBinding("age", vf.createLiteral(56));
+        final VisibilityBindingSet visiAge = new VisibilityBindingSet(nameAge);
 
-        MapBindingSet nameHair = new MapBindingSet();
+        final MapBindingSet nameHair = new MapBindingSet();
         nameHair.addBinding("name", vf.createLiteral("Bob"));
         nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+        final VisibilityBindingSet visiHair = new VisibilityBindingSet(nameHair);
 
-        Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+        final Iterator<VisibilityBindingSet> rightResults = Lists.<VisibilityBindingSet>newArrayList(visiAge, visiHair).iterator();
 
         // Therefore, there are a few new join results that mix the two together.
-        Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
 
-        Set<BindingSet> newJoinResults = new HashSet<>();
+        final Set<BindingSet> newJoinResults = new HashSet<>();
         while(newJoinResultsIt.hasNext()) {
             newJoinResults.add( newJoinResultsIt.next() );
         }
 
-        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        MapBindingSet nameHeightAge = new MapBindingSet();
+        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        final MapBindingSet nameHeightAge = new MapBindingSet();
         nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
         nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(nameHeightAge);
+        expected.add(new VisibilityBindingSet(nameHeightAge));
 
-        MapBindingSet nameHeightHair = new MapBindingSet();
+        final MapBindingSet nameHeightHair = new MapBindingSet();
         nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
         nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(nameHeightHair);
+        expected.add(new VisibilityBindingSet(nameHeightHair));
 
         assertEquals(expected, newJoinResults);
     }
 
     @Test
     public void newRightResult_noLeftMatches() {
-        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
 
         // There are no left results.
-        Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator();
+        final Iterator<VisibilityBindingSet> leftResults= new ArrayList<VisibilityBindingSet>().iterator();
 
         // There is a new right result.
-        MapBindingSet newRightResult = new MapBindingSet();
+        final MapBindingSet newRightResult = new MapBindingSet();
         newRightResult.addBinding("name", vf.createLiteral("Bob"));
 
         // Therefore, there are no new join results.
-        Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
         assertFalse( newJoinResultsIt.hasNext() );
     }
 
     @Test
     public void newRightResult_joinsWithLeftResults() {
-        IterativeJoin leftOuterJoin = new LeftOuterJoin();
+        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
 
         // There are a few left results that join with the new right result.
-        MapBindingSet nameAge = new MapBindingSet();
+        final MapBindingSet nameAge = new MapBindingSet();
         nameAge.addBinding("name", vf.createLiteral("Bob"));
         nameAge.addBinding("age", vf.createLiteral(56));
 
-        MapBindingSet nameHair = new MapBindingSet();
+        final MapBindingSet nameHair = new MapBindingSet();
         nameHair.addBinding("name", vf.createLiteral("Bob"));
         nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
 
-        Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+        final Iterator<VisibilityBindingSet> leftResults = Lists.<VisibilityBindingSet>newArrayList(
+                new VisibilityBindingSet(nameAge),
+                new VisibilityBindingSet(nameHair)).iterator();
 
         // There is a new right result.
-        MapBindingSet newRightResult = new MapBindingSet();
+        final MapBindingSet newRightResult = new MapBindingSet();
         newRightResult.addBinding("name", vf.createLiteral("Bob"));
         newRightResult.addBinding("height", vf.createLiteral("5'9\""));
 
         // Therefore, there are a few new join results that mix the two together.
-        Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
 
-        Set<BindingSet> newJoinResults = new HashSet<>();
+        final Set<BindingSet> newJoinResults = new HashSet<>();
         while(newJoinResultsIt.hasNext()) {
             newJoinResults.add( newJoinResultsIt.next() );
         }
 
-        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        MapBindingSet nameHeightAge = new MapBindingSet();
+        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        final MapBindingSet nameHeightAge = new MapBindingSet();
         nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
         nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(nameHeightAge);
+        expected.add(new VisibilityBindingSet(nameHeightAge));
 
-        MapBindingSet nameHeightHair = new MapBindingSet();
+        final MapBindingSet nameHeightHair = new MapBindingSet();
         nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
         nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(nameHeightHair);
+        expected.add(new VisibilityBindingSet(nameHeightHair));
 
         assertEquals(expected, newJoinResults);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
index 15023c5..651ea11 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
@@ -37,6 +37,8 @@ import org.openrdf.query.impl.MapBindingSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+
 /**
  * Tests the methods of {@link NaturalJoin}.
  */
@@ -46,120 +48,124 @@ public class NaturalJoinTest {
 
     @Test
     public void newLeftResult_noRightMatches() {
-        IterativeJoin naturalJoin = new NaturalJoin();
+        final IterativeJoin naturalJoin = new NaturalJoin();
 
         // There is a new left result.
-        MapBindingSet newLeftResult = new MapBindingSet();
+        final MapBindingSet newLeftResult = new MapBindingSet();
         newLeftResult.addBinding("name", vf.createLiteral("Bob"));
 
         // There are no right results that join with the left result.
-        Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator();
+        final Iterator<VisibilityBindingSet> rightResults= new ArrayList<VisibilityBindingSet>().iterator();
 
         // Therefore, the left result is a new join result.
-        Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), rightResults);
         assertFalse( newJoinResultsIt.hasNext() );
     }
 
     @Test
     public void newLeftResult_joinsWithRightResults() {
-        IterativeJoin naturalJoin = new NaturalJoin();
+        final IterativeJoin naturalJoin = new NaturalJoin();
 
         // There is a new left result.
-        MapBindingSet newLeftResult = new MapBindingSet();
+        final MapBindingSet newLeftResult = new MapBindingSet();
         newLeftResult.addBinding("name", vf.createLiteral("Bob"));
         newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
 
         // There are a few right results that join with the left result.
-        MapBindingSet nameAge = new MapBindingSet();
+        final MapBindingSet nameAge = new MapBindingSet();
         nameAge.addBinding("name", vf.createLiteral("Bob"));
         nameAge.addBinding("age", vf.createLiteral(56));
 
-        MapBindingSet nameHair = new MapBindingSet();
+        final MapBindingSet nameHair = new MapBindingSet();
         nameHair.addBinding("name", vf.createLiteral("Bob"));
         nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
 
-        Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+        final Iterator<VisibilityBindingSet> rightResults = Lists.<VisibilityBindingSet>newArrayList(
+                new VisibilityBindingSet(nameAge),
+                new VisibilityBindingSet(nameHair)).iterator();
 
         // Therefore, there are a few new join results that mix the two together.
-        Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), rightResults);
 
-        Set<BindingSet> newJoinResults = new HashSet<>();
+        final Set<BindingSet> newJoinResults = new HashSet<>();
         while(newJoinResultsIt.hasNext()) {
             newJoinResults.add( newJoinResultsIt.next() );
         }
 
-        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        MapBindingSet nameHeightAge = new MapBindingSet();
+        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        final MapBindingSet nameHeightAge = new MapBindingSet();
         nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
         nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(nameHeightAge);
+        expected.add(new VisibilityBindingSet(nameHeightAge));
 
-        MapBindingSet nameHeightHair = new MapBindingSet();
+        final MapBindingSet nameHeightHair = new MapBindingSet();
         nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
         nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(nameHeightHair);
+        expected.add(new VisibilityBindingSet(nameHeightHair));
 
         assertEquals(expected, newJoinResults);
     }
 
     @Test
     public void newRightResult_noLeftMatches() {
-        IterativeJoin naturalJoin = new NaturalJoin();
+        final IterativeJoin naturalJoin = new NaturalJoin();
 
         // There are no left results.
-        Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator();
+        final Iterator<VisibilityBindingSet> leftResults= new ArrayList<VisibilityBindingSet>().iterator();
 
         // There is a new right result.
-        MapBindingSet newRightResult = new MapBindingSet();
+        final MapBindingSet newRightResult = new MapBindingSet();
         newRightResult.addBinding("name", vf.createLiteral("Bob"));
 
         // Therefore, there are no new join results.
-        Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
         assertFalse( newJoinResultsIt.hasNext() );
     }
 
     @Test
     public void newRightResult_joinsWithLeftResults() {
-        IterativeJoin naturalJoin = new NaturalJoin();
+        final IterativeJoin naturalJoin = new NaturalJoin();
 
         // There are a few left results that join with the new right result.
-        MapBindingSet nameAge = new MapBindingSet();
+        final MapBindingSet nameAge = new MapBindingSet();
         nameAge.addBinding("name", vf.createLiteral("Bob"));
         nameAge.addBinding("age", vf.createLiteral(56));
 
-        MapBindingSet nameHair = new MapBindingSet();
+        final MapBindingSet nameHair = new MapBindingSet();
         nameHair.addBinding("name", vf.createLiteral("Bob"));
         nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
 
-        Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+        final Iterator<VisibilityBindingSet> leftResults = Lists.<VisibilityBindingSet>newArrayList(
+                new VisibilityBindingSet(nameAge),
+                new VisibilityBindingSet(nameHair)).iterator();
 
         // There is a new right result.
-        MapBindingSet newRightResult = new MapBindingSet();
+        final MapBindingSet newRightResult = new MapBindingSet();
         newRightResult.addBinding("name", vf.createLiteral("Bob"));
         newRightResult.addBinding("height", vf.createLiteral("5'9\""));
 
         // Therefore, there are a few new join results that mix the two together.
-        Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult);
+        final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
 
-        Set<BindingSet> newJoinResults = new HashSet<>();
+        final Set<BindingSet> newJoinResults = new HashSet<>();
         while(newJoinResultsIt.hasNext()) {
             newJoinResults.add( newJoinResultsIt.next() );
         }
 
-        Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        MapBindingSet nameHeightAge = new MapBindingSet();
+        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+        final MapBindingSet nameHeightAge = new MapBindingSet();
         nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
         nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(nameHeightAge);
+        expected.add(new VisibilityBindingSet(nameHeightAge));
 
-        MapBindingSet nameHeightHair = new MapBindingSet();
+        final MapBindingSet nameHeightHair = new MapBindingSet();
         nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
         nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
         nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(nameHeightHair);
+        expected.add(new VisibilityBindingSet(nameHeightHair));
 
         assertEquals(expected, newJoinResults);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
index 3168a71..7be539a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
@@ -30,6 +30,8 @@ import org.openrdf.rio.RDFHandlerException;
 import org.openrdf.rio.RDFParser;
 import org.openrdf.rio.helpers.RDFHandlerBase;
 
+import com.google.common.base.Optional;
+
 import io.fluo.api.client.FluoClient;
 import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.resolver.RdfToRyaConversions;
@@ -68,7 +70,7 @@ public class FluoLoader extends RDFHandlerBase {
         // If the buffer is full, flush it to the Fluo table.
         if(buff.size() == FLUSH_SIZE) {
             log.trace("Flushing " + buff.size() + " Statements from the buffer to Fluo.");
-            insertTriples.insert(fluoClient, buff);
+            insertTriples.insert(fluoClient, buff, Optional.<String>absent());
             buff.clear();
         }
 
@@ -83,7 +85,7 @@ public class FluoLoader extends RDFHandlerBase {
 
         if(!buff.isEmpty()) {
             log.trace("Flushing the last " + buff.size() + " Statements from the buffer to Fluo.");
-            insertTriples.insert(fluoClient, buff);
+            insertTriples.insert(fluoClient, buff, Optional.<String>absent());
             buff.clear();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 0cbfa9a..eb7fb17 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -48,6 +48,7 @@ import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.repository.RepositoryException;
 import org.openrdf.sail.SailException;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -60,7 +61,6 @@ import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.domain.RyaType;
 import mvm.rya.api.domain.RyaURI;
 import mvm.rya.api.resolver.RyaToRdfConversions;
-import mvm.rya.api.resolver.RyaTypeResolverException;
 import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
 import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 import mvm.rya.indexing.external.tupleSet.PcjTables;
@@ -77,7 +77,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
     private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class);
 
     private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-    
+
     // Employees
     private static final RyaURI alice = new RyaURI("http://Alice");
     private static final RyaURI bob = new RyaURI("http://Bob");
@@ -290,7 +290,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
     private static void loadDataIntoFluo(final FluoClient fluoClient, final Set<RyaStatement> statements) {
         final InsertTriples insertTriples = new InsertTriples();
         for(final RyaStatement statement : statements) {
-            insertTriples.insert(fluoClient, statement);
+            insertTriples.insert(fluoClient, statement, Optional.<String>absent());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 41c4f08..566d2d2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.junit.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.io.Files;
 
 import io.fluo.api.client.FluoFactory;
@@ -75,7 +76,7 @@ public class CountStatementsIT extends ITBase {
         triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
         triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
 
-        new InsertTriples().insert(fluoClient, triples);
+        new InsertTriples().insert(fluoClient, triples, Optional.<String>absent());
 
         // Load some statements into the Fluo app.
         final BigInteger count = new CountStatements().countStatements(fluoClient);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 157412a..0e766b1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -33,6 +33,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.junit.Test;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 import mvm.rya.api.domain.RyaStatement;
@@ -77,7 +78,7 @@ public class GetQueryReportIT extends ITBase {
         new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Wait for the results to finish processing.
         fluo.waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index 41c2d7d..6e633f6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -33,6 +33,7 @@ import org.openrdf.model.impl.URIImpl;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.BindingImpl;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 import mvm.rya.api.domain.RyaStatement;
@@ -135,7 +136,7 @@ public class InputIT extends ITBase {
         assertTrue( results.isEmpty() );
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -187,7 +188,7 @@ public class InputIT extends ITBase {
         assertEquals(expected, results);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query also include Frank.
         fluo.waitForObservers();
@@ -244,7 +245,7 @@ public class InputIT extends ITBase {
         assertEquals(expected, results);
 
         // Stream the same Alice triple into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query is stiill only Alice.
         fluo.waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 46db8cd..f408a1c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -33,6 +33,7 @@ import org.openrdf.model.vocabulary.XMLSchema;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.BindingImpl;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
 import mvm.rya.api.domain.RyaStatement;
@@ -81,7 +82,7 @@ public class QueryIT extends ITBase {
         new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -163,7 +164,7 @@ public class QueryIT extends ITBase {
         new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -224,7 +225,7 @@ public class QueryIT extends ITBase {
         new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -268,7 +269,7 @@ public class QueryIT extends ITBase {
         new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index ee3fffd..b75e624 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -43,6 +43,7 @@ import org.openrdf.model.impl.URIImpl;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.BindingImpl;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -50,7 +51,6 @@ import com.google.common.collect.Sets;
 import io.fluo.api.client.Snapshot;
 import io.fluo.api.data.Bytes;
 import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTypeResolverException;
 import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
 import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 import mvm.rya.indexing.external.tupleSet.PcjTables;
@@ -66,7 +66,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
 public class RyaExportIT extends ITBase {
 
 	private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-	
+
     /**
      * Configure the export observer to use the Mini Accumulo instance as the
      * export destination for new PCJ results.
@@ -138,7 +138,7 @@ public class RyaExportIT extends ITBase {
         new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
 
         // Stream the data into Fluo.
-        new InsertTriples().insert(fluoClient, streamedTriples);
+        new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
 
         // Fetch the exported results from Accumulo once the observers finish working.
         fluo.waitForObservers();