You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/19 00:05:42 UTC

[2/2] incubator-rya git commit: RYA-283-Batch-Observer-Integration. Closes #198.

RYA-283-Batch-Observer-Integration. Closes #198.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/ad60aca8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/ad60aca8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/ad60aca8

Branch: refs/heads/master
Commit: ad60aca8d8a1002b528aeaa4376e86925dd0b2e0
Parents: e387818
Author: Caleb Meier <ca...@parsons.com>
Authored: Sat Aug 5 16:53:49 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 18 17:04:45 2017 -0700

----------------------------------------------------------------------
 .../indexing/pcj/fluo/api/CreateFluoPcj.java    |  17 +-
 .../pcj/fluo/app/JoinResultUpdater.java         | 146 ++++---
 .../batch/AbstractBatchBindingSetUpdater.java   |   2 +-
 .../fluo/app/batch/JoinBatchInformation.java    |  36 +-
 .../JoinBatchInformationTypeAdapter.java        |   4 +-
 .../pcj/fluo/app/query/FluoQueryColumns.java    |   3 +
 .../fluo/app/query/FluoQueryMetadataDAO.java    |   4 +
 .../pcj/fluo/app/query/JoinMetadata.java        |  34 +-
 .../pcj/fluo/app/query/QueryMetadata.java       |  22 +
 .../fluo/app/query/SparqlFluoQueryBuilder.java  |  12 +
 .../pcj/fluo/app/util/FluoQueryUtils.java       |  12 +
 .../pcj/fluo/app/util/NodeIdCollector.java      |  92 ++++
 .../BatchInformationSerializerTest.java         |   5 +-
 .../fluo/app/query/QueryBuilderVisitorTest.java |   4 -
 .../pcj/fluo/integration/BatchDeleteIT.java     | 316 --------------
 .../indexing/pcj/fluo/integration/BatchIT.java  | 424 +++++++++++++++++++
 .../pcj/fluo/integration/KafkaExportIT.java     |   3 -
 17 files changed, 702 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
index e450960..150a256 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -47,9 +47,7 @@ import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -90,6 +88,10 @@ public class CreateFluoPcj {
      * The default Statement Pattern batch insert size is 1000.
      */
     private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
+    /**
+     * The default Join batch size is 5000.
+     */
+    private static final int DEFAULT_JOIN_BATCH_SIZE = 5000;
 
     /**
      * The maximum number of binding sets that will be inserted into each Statement
@@ -98,11 +100,15 @@ public class CreateFluoPcj {
     private final int spInsertBatchSize;
 
     /**
+     * The maximum number of join results that will be processed per transaction.
+     */
+    private final int joinBatchSize;
+    /**
      * Constructs an instance of {@link CreateFluoPcj} that uses
      * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
      */
     public CreateFluoPcj() {
-        this(DEFAULT_SP_INSERT_BATCH_SIZE);
+        this(DEFAULT_SP_INSERT_BATCH_SIZE, DEFAULT_JOIN_BATCH_SIZE);
     }
 
     /**
@@ -111,9 +117,11 @@ public class CreateFluoPcj {
      * @param spInsertBatchSize - The maximum number of binding sets that will be
      *   inserted into each Statement Pattern's result set per Fluo transaction.
      */
-    public CreateFluoPcj(final int spInsertBatchSize) {
+    public CreateFluoPcj(final int spInsertBatchSize, final int joinBatchSize) {
         checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
+        checkArgument(joinBatchSize > 0, "The Join batch size '" + joinBatchSize + "' must be greater than 0.");
         this.spInsertBatchSize = spInsertBatchSize;
+        this.joinBatchSize = joinBatchSize;
     }
     
 
@@ -173,6 +181,7 @@ public class CreateFluoPcj {
         SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
         builder.setFluoQueryId(queryId);
         builder.setSparql(sparql);
+        builder.setJoinBatchSize(joinBatchSize);
         
         return builder.build();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 0f448a6..fb3ee0c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -23,6 +23,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -33,14 +34,18 @@ import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.indexing.pcj.fluo.app.batch.AbstractBatchBindingSetUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -51,7 +56,6 @@ import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -122,14 +126,17 @@ public class JoinResultUpdater {
         }
 
         // Iterates over the sibling node's BindingSets that join with the new binding set.
-        final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childNodeId, childBindingSet, siblingId, tx);
-
+        Set<VisibilityBindingSet> siblingBindingSets = new HashSet<>();
+        Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, siblingId);
+        Column siblingColumn = getScanColumnFamily(siblingId);
+        Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize());
+        
         // Iterates over the resulting BindingSets from the join.
         final Iterator<VisibilityBindingSet> newJoinResults;
         if(emittingSide == Side.LEFT) {
-            newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets);
+            newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets.iterator());
         } else {
-            newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets, childBindingSet);
+            newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets.iterator(), childBindingSet);
         }
 
         // Insert the new join binding sets to the Fluo table.
@@ -152,6 +159,22 @@ public class JoinResultUpdater {
                 tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, nodeValueBytes);
             }
         }
+        
+        // if batch limit met, there are additional entries to process
+        // update the span and register updated batch job
+        if (rowColumn.isPresent()) {
+            Span newSpan = AbstractBatchBindingSetUpdater.getNewSpan(rowColumn.get(), siblingSpan);
+            JoinBatchInformation joinBatch = JoinBatchInformation.builder()
+                .setBatchSize(joinMetadata.getJoinBatchSize())
+                .setBs(childBindingSet)
+                .setColumn(siblingColumn)
+                .setJoinType(joinMetadata.getJoinType())
+                .setSide(emittingSide)
+                .setSpan(newSpan)
+                .setTask(Task.Add)
+                .build();
+            BatchInformationDAO.addBatch(tx, joinMetadata.getNodeId(), joinBatch);
+        }
     }
 
     /**
@@ -160,8 +183,55 @@ public class JoinResultUpdater {
     public static enum Side {
         LEFT, RIGHT;
     }
+    
+    
+    /**
+     * Fetches batch to be processed by scanning over the Span specified by the
+     * {@link JoinBatchInformation}. The number of results is less than or equal
+     * to the batch size specified by the JoinBatchInformation.
+     * 
+     * @param tx - Fluo transaction in which batch operation is performed
+     * @param siblingSpan - span of sibling to retrieve elements to join with
+     * @param bsSet- set that batch results are added to
+     * @return Set - containing results of sibling scan.
+     * @throws Exception 
+     */
+    private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int batchSize) throws Exception {
+
+        RowScanner rs = tx.scanner().over(siblingSpan).fetch(siblingColumn).byRow().build();
+        Iterator<ColumnScanner> colScannerIter = rs.iterator();
+
+        boolean batchLimitMet = false;
+        Bytes row = siblingSpan.getStart().getRow();
+        while (colScannerIter.hasNext() && !batchLimitMet) {
+            ColumnScanner colScanner = colScannerIter.next();
+            row = colScanner.getRow();
+            Iterator<ColumnValue> iter = colScanner.iterator();
+            while (iter.hasNext() && !batchLimitMet) {
+                bsSet.add(BS_SERDE.deserialize(iter.next().getValue()));
+                //check if batch size has been met and set flag if it has been met
+                if (bsSet.size() >= batchSize) {
+                    batchLimitMet = true;
+                }
+            }
+        }
 
-    private FluoTableIterator makeSiblingScanIterator(final String childId, final BindingSet childBindingSet, final String siblingId, final TransactionBase tx) throws BindingSetConversionException {
+        if (batchLimitMet) {
+            return Optional.of(new RowColumn(row, siblingColumn));
+        } else {
+            return Optional.absent();
+        }
+    }
+    
+    /**
+     * Creates a Span for the sibling node to retrieve BindingSets to join with
+     * @param tx
+     * @param childId - Id of the node that was updated
+     * @param childBindingSet - BindingSet update
+     * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update
+     * @return Span to retrieve sibling node's BindingSets to form join results
+     */
+    private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) {
         // Get the common variable orders. These are used to build the prefix.
         final VariableOrder childVarOrder = getVarOrder(tx, childId);
         final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
@@ -184,15 +254,7 @@ public class JoinResultUpdater {
             }
         }
         siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix;
-
-        // Scan the sibling node's binding sets for those that have the same
-        // common variable values as childBindingSet. These needs to be joined
-        // and inserted into the Join's results. It's possible that none of these
-        // results will be new Join results if they have already been created in
-        // earlier iterations of this algorithm.
-
-        final RowScanner rs = tx.scanner().over(Span.prefix(siblingScanPrefix)).fetch(getScanColumnFamily(siblingId)).byRow().build();
-        return new FluoTableIterator(rs);
+        return Span.prefix(siblingScanPrefix);
     }
 
 
@@ -468,56 +530,4 @@ public class JoinResultUpdater {
         }
     }
 
-    /**
-     * Iterates over rows that have a Binding Set column and returns the unmarshalled
-     * {@link BindingSet}s.
-     */
-    private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> {
-
-        private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
-
-        private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
-                FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
-                FluoQueryColumns.JOIN_BINDING_SET,
-                FluoQueryColumns.FILTER_BINDING_SET);
-
-        private final Iterator<ColumnScanner> rows;
-
-        /**
-         * Constructs an instance of {@link FluoTableIterator}.
-         *
-         * @param rows - Iterates over RowId values in a Fluo Table. (not null)
-         */
-        public FluoTableIterator(final RowScanner rows) {
-            this.rows = checkNotNull(rows).iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return rows.hasNext();
-        }
-
-        @Override
-        public VisibilityBindingSet next() {
-            final ColumnScanner columns = rows.next();
-
-            for (final ColumnValue cv : columns) {
-                if(BINDING_SET_COLUMNS.contains(cv.getColumn())) {
-                    final Bytes value = cv.getValue();
-                    try {
-                        return BS_SERDE.deserialize(value);
-                    } catch (final Exception e) {
-                        throw new RuntimeException("Row did not containing a Binding Set.", e);
-                    }
-                }
-            }
-
-            throw new RuntimeException("Row did not containing a Binding Set.");
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("remove() is unsupported.");
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
index db33d3b..9584a10 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
@@ -38,7 +38,7 @@ public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetU
      * @param oldSpan - old Span to be updated with newStart
      * @return - updated Span used with an updated BatchInformation object to complete the batch task
      */
-    public Span getNewSpan(RowColumn newStart, Span oldSpan) {
+    public static Span getNewSpan(RowColumn newStart, Span oldSpan) {
         return new Span(newStart, oldSpan.isStartInclusive(), oldSpan.getEnd(), oldSpan.isEndInclusive());
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
index 71ac557..d049ff0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -37,8 +37,7 @@ import jline.internal.Preconditions;
  * VariableOrder are specified. This is so that the sibling node (the node that
  * wasn't updated) can be scanned to obtain results that can be joined with the
  * VisibilityBindingSet. The assumption here is that the Span is derived from
- * the {@link Binding}s of common variables between the join children, with
- * Values ordered according to the indicated {@link VariableOrder}. This class
+ * the {@link Binding}s of common variables between the join children. This class
  * represents a batch order to perform a given task on join BindingSet results.
  * The {@link Task} is to Add, Delete, or Update. This batch order is processed
  * by the {@link BatchObserver} and used with the nodeId provided to the
@@ -54,7 +53,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
 
     private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater();
     private VisibilityBindingSet bs; //update for join child indicated by side
-    private VariableOrder varOrder; //variable order for child indicated by Span
     private Side side;  //join child that was updated by bs
     private JoinType join;
     /**
@@ -63,20 +61,18 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
      * @param column - Column of join child to be scanned
      * @param span - span of join child to be scanned (derived from common variables of left and right join children)
      * @param bs - BindingSet to be joined with results of child scan
-     * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span)
      * @param side - The side of the child that the VisibilityBindingSet update occurred at
      * @param join - JoinType (left, right, natural inner)
      */
-    public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
+    public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) {
         super(batchSize, task, column, span);
         this.bs = Preconditions.checkNotNull(bs);
-        this.varOrder = Preconditions.checkNotNull(varOrder);
         this.side = Preconditions.checkNotNull(side);
         this.join = Preconditions.checkNotNull(join);
     }
     
-    public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
-        this(DEFAULT_BATCH_SIZE, task, column, span, bs, varOrder, side, join);
+    public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) {
+        this(DEFAULT_BATCH_SIZE, task, column, span, bs, side, join);
     }
     
     /**
@@ -95,13 +91,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
         return join;
     }
     
-    /**
-     * Returns the VariableOrder for the join child corresponding to the Span.
-     * @return {@link VariableOrder} used to join {@link VisibilityBindingSet}s.
-     */
-    public VariableOrder getVarOrder() {
-        return varOrder;
-    }
 
    /**
     * Sets the VisibilityBindingSet that represents an update to the join child.  The join child
@@ -129,7 +118,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
                 .append("    Batch Size: " + super.getBatchSize() + "\n")
                 .append("    Task: " + super.getTask() + "\n")
                 .append("    Column: " + super.getColumn() + "\n")
-                .append("    VariableOrder: " + varOrder + "\n")
                 .append("    Join Type: " + join + "\n")
                 .append("    Join Side: " + side + "\n")
                 .append("    Binding Set: " + bs + "\n")
@@ -149,12 +137,12 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
 
         JoinBatchInformation batch = (JoinBatchInformation) other;
         return super.equals(other) &&  Objects.equals(this.bs, batch.bs) && Objects.equals(this.join, batch.join)
-                && Objects.equals(this.side, batch.side) && Objects.equals(this.varOrder, batch.varOrder);
+                && Objects.equals(this.side, batch.side);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side, varOrder);
+        return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side);
     }
     
     
@@ -169,7 +157,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
         private Column column;
         private Span span;
         private VisibilityBindingSet bs;
-        private VariableOrder varOrder;
         private JoinType join;
         private Side side;
    
@@ -237,19 +224,10 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
         }
    
         /**
-         * Sets the variable order for the join child corresponding to the Span
-         * @param varOrder - Variable order used to join BindingSet with result of scan
-         */
-        public Builder setVarOrder(VariableOrder varOrder) {
-            this.varOrder = varOrder;
-            return this;
-        }
-        
-        /**
          * @return an instance of {@link JoinBatchInformation} constructed from the parameters passed to this Builder
          */
         public JoinBatchInformation build() {
-            return new JoinBatchInformation(batchSize, task, column, span, bs, varOrder, side, join); 
+            return new JoinBatchInformation(batchSize, task, column, span, bs, side, join); 
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
index 9f3f1a6..f42a2ba 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
@@ -60,7 +60,6 @@ public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatch
         result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
         result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
         result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
-        result.add("varOrder", new JsonPrimitive(Joiner.on(";").join(batch.getVarOrder().getVariableOrders())));
         result.add("side", new JsonPrimitive(batch.getSide().name()));
         result.add("joinType", new JsonPrimitive(batch.getJoinType().name()));
         String updateVarOrderString = Joiner.on(";").join(batch.getBs().getBindingNames());
@@ -82,12 +81,11 @@ public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatch
         boolean startInc = json.get("startInc").getAsBoolean();
         boolean endInc = json.get("endInc").getAsBoolean();
         Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
-        VariableOrder varOrder = new VariableOrder(json.get("varOrder").getAsString());
         VariableOrder updateVarOrder = new VariableOrder(json.get("updateVarOrder").getAsString());
         VisibilityBindingSet bs = converter.convert(json.get("bindingSet").getAsString(), updateVarOrder);
         Side side = Side.valueOf(json.get("side").getAsString());
         JoinType join = JoinType.valueOf(json.get("joinType").getAsString());
-        return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs).setVarOrder(varOrder)
+        return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs)
                .setSide(side).setJoinType(join).build();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 8cd25d0..2eae4ff 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -108,6 +108,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  *     <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
  *     <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
+ *     <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr> 
  *     <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
  *   </table>
  * </p>
@@ -238,6 +239,7 @@ public class FluoQueryColumns {
     public static final Column JOIN_PARENT_NODE_ID = new Column(JOIN_METADATA_CF, "parentNodeId");
     public static final Column JOIN_LEFT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "leftChildNodeId");
     public static final Column JOIN_RIGHT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "rightChildNodeId");
+    public static final Column JOIN_BATCH_SIZE = new Column(JOIN_METADATA_CF, "joinBatchSize");
     public static final Column JOIN_BINDING_SET = new Column(JOIN_METADATA_CF, "bindingSet");
 
     // Statement Pattern Metadata columns.
@@ -340,6 +342,7 @@ public class FluoQueryColumns {
                         JOIN_TYPE,
                         JOIN_PARENT_NODE_ID,
                         JOIN_LEFT_CHILD_NODE_ID,
+                        JOIN_BATCH_SIZE, 
                         JOIN_RIGHT_CHILD_NODE_ID)),
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 5ba7383..1c34836 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -385,6 +385,7 @@ public class FluoQueryMetadataDAO {
         tx.set(rowId, FluoQueryColumns.JOIN_TYPE, metadata.getJoinType().toString() );
         tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, metadata.getParentNodeId() );
         tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, metadata.getLeftChildNodeId() );
+        tx.set(rowId, FluoQueryColumns.JOIN_BATCH_SIZE, Integer.toString(metadata.getJoinBatchSize()));
         tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, metadata.getRightChildNodeId() );
     }
 
@@ -410,6 +411,7 @@ public class FluoQueryMetadataDAO {
                 FluoQueryColumns.JOIN_TYPE,
                 FluoQueryColumns.JOIN_PARENT_NODE_ID,
                 FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID,
+                FluoQueryColumns.JOIN_BATCH_SIZE,
                 FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
 
         // Return an object holding them.
@@ -421,12 +423,14 @@ public class FluoQueryMetadataDAO {
 
         final String parentNodeId = values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID);
         final String leftChildNodeId = values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID);
+        final int joinBatchSize = Integer.parseInt(values.get(FluoQueryColumns.JOIN_BATCH_SIZE));
         final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
 
         return JoinMetadata.builder(nodeId)
                 .setVarOrder(varOrder)
                 .setJoinType(joinType)
                 .setParentNodeId(parentNodeId)
+                .setJoinBatchSize(joinBatchSize)
                 .setLeftChildNodeId(leftChildNodeId)
                 .setRightChildNodeId(rightChildNodeId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
index aa79daf..d6c488b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 
 /**
  * Metadata that is specific to Join nodes.
@@ -49,6 +50,9 @@ public class JoinMetadata extends CommonNodeMetadata {
     private final String parentNodeId;
     private final String leftChildNodeId;
     private final String rightChildNodeId;
+    private int joinBatchSize;
+    
+    public static final int DEFAULT_JOIN_BATCH_SIZE = 5000;
 
     /**
      * Constructs an instance of {@link JoinMetadata}.
@@ -59,6 +63,7 @@ public class JoinMetadata extends CommonNodeMetadata {
      * @param parentNodeId - The node id of this node's parent. (not null)
      * @param leftChildNodeId - One of the nodes whose results are being joined. (not null)
      * @param rightChildNodeId - The other node whose results are being joined. (not null)
+     * @param joinBatchSize - Batch size used to process large joins
      */
     public JoinMetadata(
             final String nodeId,
@@ -66,12 +71,15 @@ public class JoinMetadata extends CommonNodeMetadata {
             final JoinType joinType,
             final String parentNodeId,
             final String leftChildNodeId,
-            final String rightChildNodeId) {
+            final String rightChildNodeId,
+            final int joinBatchSize) {
         super(nodeId, varOrder);
         this.joinType = checkNotNull(joinType);
         this.parentNodeId = checkNotNull(parentNodeId);
         this.leftChildNodeId = checkNotNull(leftChildNodeId);
         this.rightChildNodeId = checkNotNull(rightChildNodeId);
+        Preconditions.checkArgument(joinBatchSize > 0);
+        this.joinBatchSize = joinBatchSize;
     }
 
     /**
@@ -101,6 +109,13 @@ public class JoinMetadata extends CommonNodeMetadata {
     public String getRightChildNodeId() {
         return rightChildNodeId;
     }
+    
+    /**
+     * @return - Batch size used to process large joins
+     */
+    public int getJoinBatchSize() {
+        return joinBatchSize;
+    }
 
     @Override
     public int hashCode() {
@@ -110,6 +125,7 @@ public class JoinMetadata extends CommonNodeMetadata {
                 joinType,
                 parentNodeId,
                 leftChildNodeId,
+                joinBatchSize,
                 rightChildNodeId);
     }
 
@@ -127,6 +143,7 @@ public class JoinMetadata extends CommonNodeMetadata {
                         .append(parentNodeId, joinMetadata.parentNodeId)
                         .append(leftChildNodeId, joinMetadata.leftChildNodeId)
                         .append(rightChildNodeId, joinMetadata.rightChildNodeId)
+                        .append(joinBatchSize, joinMetadata.joinBatchSize)
                         .isEquals();
             }
             return false;
@@ -145,6 +162,7 @@ public class JoinMetadata extends CommonNodeMetadata {
                 .append("    Parent Node ID: " + parentNodeId + "\n")
                 .append("    Left Child Node ID: " + leftChildNodeId + "\n")
                 .append("    Right Child Node ID: " + rightChildNodeId + "\n")
+                .append("    Join Batch Size: " + joinBatchSize + "\n")
                 .append("}")
                 .toString();
     }
@@ -171,6 +189,7 @@ public class JoinMetadata extends CommonNodeMetadata {
         private String parentNodeId;
         private String leftChildNodeId;
         private String rightChildNodeId;
+        private int joinBatchSize = DEFAULT_JOIN_BATCH_SIZE;
 
         /**
          * Constructs an instance of {@link Builder}.
@@ -248,6 +267,16 @@ public class JoinMetadata extends CommonNodeMetadata {
             return this;
         }
         
+        /**
+         * Sets the batch size used to process large joins.
+         * @param joinBatchSize - batch size used to process large joins
+         * @return This builder so that method invocation could be chained.
+         */
+        public Builder setJoinBatchSize(int joinBatchSize) {
+            this.joinBatchSize = joinBatchSize;
+            return this;
+        }
+        
         public String getLeftChildNodeId() {
             return leftChildNodeId;
         }
@@ -266,7 +295,8 @@ public class JoinMetadata extends CommonNodeMetadata {
                     joinType,
                     parentNodeId,
                     leftChildNodeId,
-                    rightChildNodeId);
+                    rightChildNodeId,
+                    joinBatchSize);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index fe130fb..e46b405 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -28,6 +29,7 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -48,6 +50,7 @@ public class QueryMetadata extends CommonNodeMetadata {
     private final Set<ExportStrategy> exportStrategy;
     private final QueryType queryType;
     private final String exportId;
+    
 
     /**
      * Constructs an instance of {@link QueryMetadata}.
@@ -109,6 +112,7 @@ public class QueryMetadata extends CommonNodeMetadata {
         return queryType;
     }
     
+    
     @Override
     public int hashCode() {
         return Objects.hashCode(
@@ -178,6 +182,8 @@ public class QueryMetadata extends CommonNodeMetadata {
         private String childNodeId;
         private Set<ExportStrategy> exportStrategies;
         private QueryType queryType;
+        private Optional<Integer> joinBatchSize = Optional.empty();
+        
 
         /**
          * Constructs an instance of {@link Builder}.
@@ -267,6 +273,22 @@ public class QueryMetadata extends CommonNodeMetadata {
         public String getChildNodeId() {
             return childNodeId;
         }
+        
+        /**
+         * Sets batch size used to process joins for this query
+         * @param joinBatchSize - batch size used to process joins
+         */
+        public Builder setJoinBatchSize(Optional<Integer> joinBatchSize) {
+            this.joinBatchSize = joinBatchSize;
+            return this;
+        }
+        
+        /**
+         * @return Optional containing the batch size used to process large joins
+         */
+        public Optional<Integer> getJoinBatchSize() {
+            return joinBatchSize;
+        }
 
         /**
          * @return An instance of {@link QueryMetadata} build using this builder's values.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 6c03be1..7bf6f45 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -102,6 +102,8 @@ public class SparqlFluoQueryBuilder {
     private TupleExpr te;
     private String queryId;
     private NodeIds nodeIds;
+    private Optional<Integer> joinBatchSize = Optional.empty();
+  
     //Default behavior is to export to Kafka - subject to change when user can 
     //specify their own export strategy
     private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka));
@@ -137,6 +139,12 @@ public class SparqlFluoQueryBuilder {
         return this;
     }
     
+    public SparqlFluoQueryBuilder setJoinBatchSize(int joinBatchSize) {
+        Preconditions.checkArgument(joinBatchSize > 0); 
+        this.joinBatchSize = Optional.of(joinBatchSize);
+        return this;
+    }
+    
     public FluoQuery build() {
         Preconditions.checkNotNull(sparql);
         Preconditions.checkNotNull(queryId);
@@ -167,6 +175,7 @@ public class SparqlFluoQueryBuilder {
         queryBuilder.setSparql(sparql);
         queryBuilder.setChildNodeId(childNodeId);
         queryBuilder.setExportStrategies(exportStrategies);
+        queryBuilder.setJoinBatchSize(joinBatchSize);
         fluoQueryBuilder.setQueryMetadata(queryBuilder);
         
         setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);
@@ -427,6 +436,9 @@ public class SparqlFluoQueryBuilder {
             joinBuilder.setJoinType(joinType);
             joinBuilder.setLeftChildNodeId( leftChildNodeId );
             joinBuilder.setRightChildNodeId( rightChildNodeId );
+            if(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().isPresent()) {
+                joinBuilder.setJoinBatchSize(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().get());
+            }
 
             // Figure out the variable order for each child node's binding set and
             // store it. Also store that each child node's parent is this join.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
index 303f9bb..ac41160 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
@@ -59,4 +59,16 @@ public class FluoQueryUtils {
         return queryIdParts[1];
     }
     
+    /**
+     * Uses a {@link NodeIdCollector} visitor to do a pre-order traverse of the
+     * FluoQuery and gather the nodeIds of the metadata nodes.
+     * @param query - FluoQuery to be traversed
+     * @return - List of nodeIds, ordered according to the pre-order traversal of the FluoQuery
+     */
+    public static List<String> collectNodeIds(FluoQuery query) {
+        NodeIdCollector collector = new NodeIdCollector(query);
+        collector.visit();
+        return collector.getNodeIds();
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java
new file mode 100644
index 0000000..6d374d8
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadataVisitorBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+
+/**
+ * A visitor that does a pre-order traversal of the FluoQuery and
+ * collects the ids of metadata query nodes along the way.
+ *
+ */
+public class NodeIdCollector extends QueryMetadataVisitorBase {
+
+    List<String> ids;
+    
+    public NodeIdCollector(FluoQuery fluoQuery ) {
+        super(fluoQuery);
+        ids = new ArrayList<>();
+    }
+    
+    public List<String> getNodeIds() {
+        return ids;
+    }
+    
+    public void visit(QueryMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+    public void visit(ProjectionMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+    public void visit(ConstructQueryMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+    public void visit(FilterMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+    public void visit(JoinMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+    public void visit(StatementPatternMetadata metadata) {
+        ids.add(metadata.getNodeId());
+    }
+    
+    public void visit(PeriodicQueryMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+    public void visit(AggregationMetadata metadata) {
+        ids.add(metadata.getNodeId());
+        super.visit(metadata);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
index fe89325..210aa0c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
@@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.Arrays;
 import java.util.Optional;
 
 import org.apache.fluo.api.data.Bytes;
@@ -32,7 +31,6 @@ import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
 import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.junit.Test;
 import org.openrdf.model.impl.URIImpl;
@@ -62,8 +60,7 @@ public class BatchInformationSerializerTest {
         
         JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update)
                 .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346")))
-                .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b")))
-                .setBs(vBis).build();
+                .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setBs(vBis).build();
         
         byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
         Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
index b432868..64504ca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
@@ -79,25 +79,21 @@ public class QueryBuilderVisitorTest {
         }
         
         public void visit(QueryMetadata.Builder queryBuilder) {
-            System.out.println(queryBuilder.getNodeId());
             ids.add(queryBuilder.getNodeId());
             super.visit(queryBuilder);
         }
         
         public void visit(ProjectionMetadata.Builder projectionBuilder) {
-            System.out.println(projectionBuilder.getNodeId());
             ids.add(projectionBuilder.getNodeId());
             super.visit(projectionBuilder);
         }
         
         public void visit(JoinMetadata.Builder joinBuilder) {
-            System.out.println(joinBuilder.getNodeId());
             ids.add(joinBuilder.getNodeId());
             super.visit(joinBuilder);
         }
         
         public void visit(StatementPatternMetadata.Builder statementBuilder) {
-            System.out.println(statementBuilder.getNodeId());
             ids.add(statementBuilder.getNodeId());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
deleted file mode 100644
index 1707308..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.integration;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.log4j.Logger;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
-import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
-import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-public class BatchDeleteIT extends RyaExportITBase {
-
-    private static final Logger log = Logger.getLogger(BatchDeleteIT.class);
-    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
-
-    @Test
-    public void simpleScanDelete() throws Exception {
-
-        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
-                + " <urn:predicate_2> ?object2 } ";
-        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
-
-            RyaURI subj = new RyaURI("urn:subject_1");
-            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
-            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-            Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
-            Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
-
-            // Create the PCJ table.
-            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
-
-            List<String> ids = getNodeIdStrings(fluoClient, queryId);
-            List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
-
-            // Stream the data into Fluo.
-            InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
-
-            // Verify the end results of the query match the expected results.
-            getMiniFluo().waitForObservers();
-
-            verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 10, 10));
-
-            createSpanBatches(fluoClient, ids, prefixes, 10);
-            getMiniFluo().waitForObservers();
-
-            verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0));
-        }
-    }
-
-     @Test
-    public void simpleJoinDelete() throws Exception {
-        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
-                + " <urn:predicate_2> ?object2 } ";
-        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
-
-            RyaURI subj = new RyaURI("urn:subject_1");
-            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
-            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-            Set<RyaStatement> statements1 = getRyaStatements(statement1, 5);
-            Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
-
-            // Create the PCJ table.
-            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
-
-            List<String> ids = getNodeIdStrings(fluoClient, queryId);
-            String joinId = ids.get(1);
-            String rightSp = ids.get(3);
-            QueryBindingSet bs = new QueryBindingSet();
-            bs.addBinding("subject", new URIImpl("urn:subject_1"));
-            bs.addBinding("object1", new URIImpl("urn:object_0"));
-            VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
-            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
-            VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
-
-            // Stream the data into Fluo.
-            InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
-
-            getMiniFluo().waitForObservers();
-            verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 5, 5));
-
-            JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
-                    .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete)
-                    .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
-            // Verify the end results of the query match the expected results.
-            createSpanBatch(fluoClient, joinId, batch);
-
-            getMiniFluo().waitForObservers();
-            verifyCounts(fluoClient, ids, Arrays.asList(25, 20, 5, 5));
-        }
-    }
-
-     @Test
-    public void simpleJoinAdd() throws Exception {
-        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
-                + " <urn:predicate_2> ?object2 } ";
-        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
-
-            RyaURI subj = new RyaURI("urn:subject_1");
-            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-            Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
-
-            // Create the PCJ table.
-            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
-            final String pcjId = pcjStorage.createPcj(sparql);
-
-            // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
-
-            List<String> ids = getNodeIdStrings(fluoClient, queryId);
-            String joinId = ids.get(1);
-            String rightSp = ids.get(3);
-            QueryBindingSet bs = new QueryBindingSet();
-            bs.addBinding("subject", new URIImpl("urn:subject_1"));
-            bs.addBinding("object1", new URIImpl("urn:object_0"));
-            VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
-            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
-            VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
-
-            // Stream the data into Fluo.
-            InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
-
-            getMiniFluo().waitForObservers();
-            verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 5));
-
-            JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
-                    .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add)
-                    .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
-            // Verify the end results of the query match the expected results.
-            createSpanBatch(fluoClient, joinId, batch);
-
-            getMiniFluo().waitForObservers();
-            verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 0, 5));
-        }
-    }
-
-    private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
-
-        Set<RyaStatement> statements = new HashSet<>();
-        final String subject = "urn:subject_";
-        final String predicate = "urn:predicate_";
-        final String object = "urn:object_";
-
-        for (int i = 0; i < numTriples; i++) {
-            RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
-            if (stmnt.getSubject() == null) {
-                stmnt.setSubject(new RyaURI(subject + i));
-            }
-            if (stmnt.getPredicate() == null) {
-                stmnt.setPredicate(new RyaURI(predicate + i));
-            }
-            if (stmnt.getObject() == null) {
-                stmnt.setObject(new RyaURI(object + i));
-            }
-            statements.add(stmnt);
-        }
-        return statements;
-    }
-
-    private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
-        List<String> nodeStrings = new ArrayList<>();
-        try (Snapshot sx = fluoClient.newSnapshot()) {
-            FluoQuery query = dao.readFluoQuery(sx, queryId);
-            nodeStrings.add(queryId);
-            Collection<JoinMetadata> jMeta = query.getJoinMetadata();
-            for (JoinMetadata meta : jMeta) {
-                nodeStrings.add(meta.getNodeId());
-                nodeStrings.add(meta.getLeftChildNodeId());
-                nodeStrings.add(meta.getRightChildNodeId());
-            }
-        }
-        return nodeStrings;
-    }
-
-    private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) {
-
-        Preconditions.checkArgument(ids.size() == prefixes.size());
-
-        try (Transaction tx = fluoClient.newTransaction()) {
-            for (int i = 0; i < ids.size(); i++) {
-                String id = ids.get(i);
-                String bsPrefix = prefixes.get(i);
-                NodeType type = NodeType.fromNodeId(id).get();
-                Column bsCol = type.getResultColumn();
-                String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
-                Span span = Span.prefix(Bytes.of(row));
-                BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
-                        .build();
-                BatchInformationDAO.addBatch(tx, id, batch);
-            }
-            tx.commit();
-        }
-    }
-
-    private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) {
-        try (Transaction tx = fluoClient.newTransaction()) {
-            BatchInformationDAO.addBatch(tx, nodeId, batch);
-            tx.commit();
-        }
-    }
-
-    private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
-        try (Transaction tx = fluoClient.newTransaction()) {
-            int count = 0;
-            RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
-            Iterator<ColumnScanner> colScanners = scanner.iterator();
-            while (colScanners.hasNext()) {
-                ColumnScanner colScanner = colScanners.next();
-                Iterator<ColumnValue> vals = colScanner.iterator();
-                while (vals.hasNext()) {
-                    vals.next();
-                    count++;
-                }
-            }
-            tx.commit();
-            return count;
-        }
-    }
-
-    private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) {
-        Preconditions.checkArgument(ids.size() == expectedCounts.size());
-        for (int i = 0; i < ids.size(); i++) {
-            String id = ids.get(i);
-            int expected = expectedCounts.get(i);
-            NodeType type = NodeType.fromNodeId(id).get();
-            int count = countResults(fluoClient, id, type.getResultColumn());
-            log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
-            switch (type) {
-            case STATEMENT_PATTERN:
-                assertEquals(expected, count);
-                break;
-            case JOIN:
-                assertEquals(expected, count);
-                break;
-            case QUERY:
-                assertEquals(expected, count);
-                break;
-            default:
-                break;
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
new file mode 100644
index 0000000..32d0e41
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public class BatchIT extends RyaExportITBase {
+
+    private static final Logger log = Logger.getLogger(BatchIT.class);
+    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+    @Test
+    public void simpleScanDelete() throws Exception {
+
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ.
+            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+                    getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+            List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            // Verify the end results of the query match the expected results.
+            getMiniFluo().waitForObservers();
+
+            verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 10, 10));
+
+            createSpanBatches(fluoClient, ids, prefixes, 10);
+            getMiniFluo().waitForObservers();
+
+            verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 0));
+        }
+    }
+
+    @Test
+    public void simpleJoinDelete() throws Exception {
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 5);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ.
+            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+                    getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+            String joinId = ids.get(2);
+            String rightSp = ids.get(4);
+            QueryBindingSet bs = new QueryBindingSet();
+            bs.addBinding("subject", new URIImpl("urn:subject_1"));
+            bs.addBinding("object1", new URIImpl("urn:object_0"));
+            VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5));
+
+            JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+                    .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete)
+                    .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).build();
+            // Verify the end results of the query match the expected results.
+            createSpanBatch(fluoClient, joinId, batch);
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 20, 5, 5));
+        }
+    }
+
+    @Test
+    public void simpleJoinAdd() throws Exception {
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ.
+            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+                    getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+            String joinId = ids.get(2);
+            String rightSp = ids.get(4);
+            QueryBindingSet bs = new QueryBindingSet();
+            bs.addBinding("subject", new URIImpl("urn:subject_1"));
+            bs.addBinding("object1", new URIImpl("urn:object_0"));
+            VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5));
+
+            JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+                    .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add)
+                    .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).build();
+            // Verify the end results of the query match the expected results.
+            createSpanBatch(fluoClient, joinId, batch);
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 5, 0, 5));
+        }
+    }
+
+    @Test
+    public void joinBatchIntegrationTest() throws Exception {
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 15);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 15);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
+            // batch size of joins to 5.
+            String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+                    getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15));
+        }
+    }
+    
+    
+    @Test
+    public void leftJoinBatchIntegrationTest() throws Exception {
+        final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+                + "OPTIONAL{ ?subject <urn:predicate_2> ?object2} } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+            
+            subj = new RyaURI("urn:subject_2");
+            RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+            
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+            Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
+            // batch size of joins to 5.
+            String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+                    getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+            inserter.insert(fluoClient, statements3, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10));
+        }
+    }
+    
+    
+    @Test
+    public void multiJoinBatchIntegrationTest() throws Exception {
+        final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; "
+                + " <urn:predicate_2> ?object2 ."
+                + " ?subject2 <urn:predicate_3> ?object2 } ";
+        try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+            RyaURI subj1 = new RyaURI("urn:subject_1");
+            RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null);
+            RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null);
+            
+            Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+            Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+            
+            RyaURI subj2 = new RyaURI("urn:subject_2");
+            RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null);
+            Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
+
+            // Create the PCJ table.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
+            // batch size of joins to 5.
+            String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+                    getRyaInstanceName());
+
+            List<String> ids = getNodeIdStrings(fluoClient, queryId);
+
+            // Stream the data into Fluo.
+            InsertTriples inserter = new InsertTriples();
+            inserter.insert(fluoClient, statements1, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+            inserter.insert(fluoClient, statements3, Optional.<String> absent());
+
+            getMiniFluo().waitForObservers();
+            verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10));
+        }
+    }
+
+
+    private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
+
+        Set<RyaStatement> statements = new HashSet<>();
+        final String subject = "urn:subject_";
+        final String predicate = "urn:predicate_";
+        final String object = "urn:object_";
+
+        for (int i = 0; i < numTriples; i++) {
+            RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
+            if (stmnt.getSubject() == null) {
+                stmnt.setSubject(new RyaURI(subject + i));
+            }
+            if (stmnt.getPredicate() == null) {
+                stmnt.setPredicate(new RyaURI(predicate + i));
+            }
+            if (stmnt.getObject() == null) {
+                stmnt.setObject(new RyaURI(object + i));
+            }
+            statements.add(stmnt);
+        }
+        return statements;
+    }
+
+    private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
+        List<String> nodeStrings;
+        try (Snapshot sx = fluoClient.newSnapshot()) {
+            FluoQuery query = dao.readFluoQuery(sx, queryId);
+            nodeStrings = FluoQueryUtils.collectNodeIds(query);
+        }
+        return nodeStrings;
+    }
+
+    private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) {
+
+        Preconditions.checkArgument(ids.size() == prefixes.size());
+
+        try (Transaction tx = fluoClient.newTransaction()) {
+            for (int i = 0; i < ids.size(); i++) {
+                String id = ids.get(i);
+                String bsPrefix = prefixes.get(i);
+                NodeType type = NodeType.fromNodeId(id).get();
+                Column bsCol = type.getResultColumn();
+                String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
+                Span span = Span.prefix(Bytes.of(row));
+                BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
+                        .build();
+                BatchInformationDAO.addBatch(tx, id, batch);
+            }
+            tx.commit();
+        }
+    }
+
+    private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) {
+        try (Transaction tx = fluoClient.newTransaction()) {
+            BatchInformationDAO.addBatch(tx, nodeId, batch);
+            tx.commit();
+        }
+    }
+
+    private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
+        try (Transaction tx = fluoClient.newTransaction()) {
+            int count = 0;
+            RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+            Iterator<ColumnScanner> colScanners = scanner.iterator();
+            while (colScanners.hasNext()) {
+                ColumnScanner colScanner = colScanners.next();
+                Iterator<ColumnValue> vals = colScanner.iterator();
+                while (vals.hasNext()) {
+                    vals.next();
+                    count++;
+                }
+            }
+            tx.commit();
+            return count;
+        }
+    }
+
+    private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) {
+        Preconditions.checkArgument(ids.size() == expectedCounts.size());
+        for (int i = 0; i < ids.size(); i++) {
+            String id = ids.get(i);
+            int expected = expectedCounts.get(i);
+            NodeType type = NodeType.fromNodeId(id).get();
+            int count = countResults(fluoClient, id, type.getResultColumn());
+            log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
+            switch (type) {
+            case STATEMENT_PATTERN:
+                assertEquals(expected, count);
+                break;
+            case JOIN:
+                assertEquals(expected, count);
+                break;
+            case QUERY:
+                assertEquals(expected, count);
+                break;
+            default:
+                break;
+            }
+        }
+    }
+
+}