You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:02:00 UTC

[7/9] incubator-rya git commit: RYA-280-Periodic Query Service. Closes #177.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java
new file mode 100644
index 0000000..581aa5b
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java
@@ -0,0 +1,68 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.UUID;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class for creating the {@link Byte}s written to the Fluo Row used to identify each {@link BatchInformation}
+ * object.  Each Byte row is formed by concatenating a query id and a batch id.   
+ *
+ */
+public class BatchRowKeyUtil {
+
+    /**
+     * Creates a Byte row form the query id. The batch id is automatically generated/
+     * @param nodeId
+     * @return Byte row used to identify the BatchInformation
+     */
+    public static Bytes getRow(String nodeId) {
+        String row = new StringBuilder().append(nodeId).append(IncrementalUpdateConstants.NODEID_BS_DELIM)
+                .append(UUID.randomUUID().toString().replace("-", "")).toString();
+        return Bytes.of(row);
+    }
+    
+    /**
+     * Creates a Byte row from a nodeId and batchId
+     * @param nodeId - query node id that batch task will be performed on
+     * @param batchId - id used to identify batch
+     * @return Byte row used to identify the BatchInformation
+     */
+    public static Bytes getRow(String nodeId, String batchId) {
+        String row = new StringBuilder().append(nodeId).append(IncrementalUpdateConstants.NODEID_BS_DELIM)
+                .append(batchId).toString();
+        return Bytes.of(row);
+    }
+    
+    /**
+     * Given a Byte row, return the query node Id
+     * @param row - the Byte row used to identify the BatchInformation
+     * @return - the queryId that the batch task is performed on
+     */
+    public static String getNodeId(Bytes row) {
+        String[] stringArray = row.toString().split(IncrementalUpdateConstants.NODEID_BS_DELIM);;
+        Preconditions.checkArgument(stringArray.length == 2);
+        return stringArray[0];
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
new file mode 100644
index 0000000..a266341
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
@@ -0,0 +1,184 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Performs updates to BindingSets in the JoinBindingSet column in batch fashion.
+ */
+public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
+
+    private static final Logger log = Logger.getLogger(JoinBatchBindingSetUpdater.class);
+    private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+    /**
+     * Processes {@link JoinBatchInformation}. Updates the BindingSets
+     * associated with the specified nodeId. The BindingSets are processed in
+     * batch fashion, where the number of results is indicated by
+     * {@link JoinBatchInformation#getBatchSize()}. BindingSets are either
+     * Added, Deleted, or Updated according to
+     * {@link JoinBatchInformation#getTask()}. In the event that the number of
+     * entries that need to be updated exceeds the batch size, the row of the
+     * first unprocessed BindingSets is used to create a new JoinBatch job to
+     * process the remaining BindingSets.
+     * @throws Exception 
+     */
+    @Override
+    public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception {
+        super.processBatch(tx, row, batch);
+        String nodeId = BatchRowKeyUtil.getNodeId(row);
+        Preconditions.checkArgument(batch instanceof JoinBatchInformation);
+        JoinBatchInformation joinBatch = (JoinBatchInformation) batch;
+        Task task = joinBatch.getTask();
+
+        // Figure out which join algorithm we are going to use.
+        final IterativeJoin joinAlgorithm;
+        switch (joinBatch.getJoinType()) {
+        case NATURAL_JOIN:
+            joinAlgorithm = new NaturalJoin();
+            break;
+        case LEFT_OUTER_JOIN:
+            joinAlgorithm = new LeftOuterJoin();
+            break;
+        default:
+            throw new RuntimeException("Unsupported JoinType: " + joinBatch.getJoinType());
+        }
+
+        Set<VisibilityBindingSet> bsSet = new HashSet<>();
+        Optional<RowColumn> rowCol = fillSiblingBatch(tx, joinBatch, bsSet);
+
+        // Iterates over the resulting BindingSets from the join.
+        final Iterator<VisibilityBindingSet> newJoinResults;
+        VisibilityBindingSet bs = joinBatch.getBs();
+        if (joinBatch.getSide() == Side.LEFT) {
+            newJoinResults = joinAlgorithm.newLeftResult(bs, bsSet.iterator());
+        } else {
+            newJoinResults = joinAlgorithm.newRightResult(bsSet.iterator(), bs);
+        }
+
+        // Insert the new join binding sets to the Fluo table.
+        final JoinMetadata joinMetadata = dao.readJoinMetadata(tx, nodeId);
+        final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+        while (newJoinResults.hasNext()) {
+            final VisibilityBindingSet newJoinResult = newJoinResults.next();
+            //create BindingSet value
+            Bytes bsBytes = BS_SERDE.serialize(newJoinResult);
+            //make rowId
+            Bytes rowKey = RowKeyUtil.makeRowKey(nodeId, joinVarOrder, newJoinResult);
+            final Column col = FluoQueryColumns.JOIN_BINDING_SET;
+            processTask(tx, task, rowKey, col, bsBytes);
+        }
+
+        // if batch limit met, there are additional entries to process
+        // update the span and register updated batch job
+        if (rowCol.isPresent()) {
+            Span newSpan = getNewSpan(rowCol.get(), joinBatch.getSpan());
+            joinBatch.setSpan(newSpan);
+            BatchInformationDAO.addBatch(tx, nodeId, joinBatch);
+        }
+
+    }
+
+    private void processTask(TransactionBase tx, Task task, Bytes row, Column column, Bytes value) {
+        switch (task) {
+        case Add:
+            tx.set(row, column, value);
+            break;
+        case Delete:
+            tx.delete(row, column);
+            break;
+        case Update:
+            log.trace("The Task Update is not supported for JoinBatchBindingSetUpdater.  Batch will not be processed.");
+            break;
+        default:
+            log.trace("Invalid Task type.  Aborting batch operation.");
+            break;
+        }
+    }
+
+    /**
+     * Fetches batch to be processed by scanning over the Span specified by the
+     * {@link JoinBatchInformation}. The number of results is less than or equal
+     * to the batch size specified by the JoinBatchInformation.
+     * 
+     * @param tx - Fluo transaction in which batch operation is performed
+     * @param batch - batch order to be processed
+     * @param bsSet- set that batch results are added to
+     * @return Set - containing results of sibling scan.
+     * @throws Exception 
+     */
+    private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception {
+
+        Span span = batch.getSpan();
+        Column column = batch.getColumn();
+        int batchSize = batch.getBatchSize();
+
+        RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
+        Iterator<ColumnScanner> colScannerIter = rs.iterator();
+
+        boolean batchLimitMet = false;
+        Bytes row = span.getStart().getRow();
+        while (colScannerIter.hasNext() && !batchLimitMet) {
+            ColumnScanner colScanner = colScannerIter.next();
+            row = colScanner.getRow();
+            Iterator<ColumnValue> iter = colScanner.iterator();
+            while (iter.hasNext()) {
+                if (bsSet.size() >= batchSize) {
+                    batchLimitMet = true;
+                    break;
+                }
+                bsSet.add(BS_SERDE.deserialize(iter.next().getValue()));
+            }
+        }
+
+        if (batchLimitMet) {
+            return Optional.of(new RowColumn(row, column));
+        } else {
+            return Optional.empty();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
new file mode 100644
index 0000000..71ac557
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -0,0 +1,255 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the join's
+ * children. The join has two children, and for one child a VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node that
+ * wasn't updated) can be scanned to obtain results that can be joined with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This class
+ * represents a batch order to perform a given task on join BindingSet results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for
+ * results using the indicated Span and Column. These results are joined with
+ * the indicated VisibilityBindingSet, and the results are added to the parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+    private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater();
+    private VisibilityBindingSet bs; //update for join child indicated by side
+    private VariableOrder varOrder; //variable order for child indicated by Span
+    private Side side;  //join child that was updated by bs
+    private JoinType join;
+    /**
+     * @param batchSize - batch size that Tasks are performed in
+     * @param task - Add, Delete, or Update
+     * @param column - Column of join child to be scanned
+     * @param span - span of join child to be scanned (derived from common variables of left and right join children)
+     * @param bs - BindingSet to be joined with results of child scan
+     * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span)
+     * @param side - The side of the child that the VisibilityBindingSet update occurred at
+     * @param join - JoinType (left, right, natural inner)
+     */
+    public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
+        super(batchSize, task, column, span);
+        this.bs = Preconditions.checkNotNull(bs);
+        this.varOrder = Preconditions.checkNotNull(varOrder);
+        this.side = Preconditions.checkNotNull(side);
+        this.join = Preconditions.checkNotNull(join);
+    }
+    
+    public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
+        this(DEFAULT_BATCH_SIZE, task, column, span, bs, varOrder, side, join);
+    }
+    
+    /**
+     * Indicates the join child that the BindingSet result {@link JoinBatchInformation#getBs()} updated.
+     * This BindingSet is join with the results obtained by scanning over the value of {@link JoinBatchInformation#getSpan()}.
+     * @return {@link Side} indicating which side new result occurred on in join
+     */
+    public Side getSide() {
+        return side;
+    }
+    
+    /**
+     * @return {@link JoinType} indicating type of join (left join, right join, natural inner join,...)
+     */
+    public JoinType getJoinType() {
+        return join;
+    }
+    
+    /**
+     * Returns the VariableOrder for the join child corresponding to the Span.
+     * @return {@link VariableOrder} used to join {@link VisibilityBindingSet}s.
+     */
+    public VariableOrder getVarOrder() {
+        return varOrder;
+    }
+
+   /**
+    * Sets the VisibilityBindingSet that represents an update to the join child.  The join child
+    * updated is indicated by the value of {@link JoinBatchInformation#getSide()}.
+    * @return VisibilityBindingSet that will be joined with results returned by scan over given
+    * {@link Span}.
+    */
+   public VisibilityBindingSet getBs() {
+        return bs;
+    }
+    
+   /**
+    * @return BatchBindingSetUpdater used to apply {@link Task} to results formed by joining the given
+    * VisibilityBindingSet with the results returned by scanned over the Span.
+    */
+    @Override
+    public BatchBindingSetUpdater getBatchUpdater() {
+        return updater;
+    }
+    
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append("Span Batch Information {\n")
+                .append("    Batch Size: " + super.getBatchSize() + "\n")
+                .append("    Task: " + super.getTask() + "\n")
+                .append("    Column: " + super.getColumn() + "\n")
+                .append("    VariableOrder: " + varOrder + "\n")
+                .append("    Join Type: " + join + "\n")
+                .append("    Join Side: " + side + "\n")
+                .append("    Binding Set: " + bs + "\n")
+                .append("}")
+                .toString();
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof JoinBatchInformation)) {
+            return false;
+        }
+
+        JoinBatchInformation batch = (JoinBatchInformation) other;
+        return super.equals(other) &&  Objects.equals(this.bs, batch.bs) && Objects.equals(this.join, batch.join)
+                && Objects.equals(this.side, batch.side) && Objects.equals(this.varOrder, batch.varOrder);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side, varOrder);
+    }
+    
+    
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private int batchSize = DEFAULT_BATCH_SIZE;
+        private Task task;
+        private Column column;
+        private Span span;
+        private VisibilityBindingSet bs;
+        private VariableOrder varOrder;
+        private JoinType join;
+        private Side side;
+   
+        /**
+         * @param batchSize - batch size that {@link Task}s are performed in
+         */
+        public Builder setBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+     
+        /**
+         * @param task - Task performed (Add, Delete, Update)
+         */
+        public Builder setTask(Task task) {
+            this.task = task;
+            return this;
+        }
+        
+        /**
+         * @param column - Column of join child to be scanned
+         */
+        public Builder setColumn(Column column) {
+            this.column = column;
+            return this;
+        }
+        
+        /**
+         * Span to scan results for one child of the join. The Span corresponds to the side of 
+         * the join that is not indicated by Side.  So if Side is Left, then the
+         * Span will scan the right child of the join.  It is assumed that the span is derived from
+         * the common variables of the left and right join children.
+         * @param span - Span over join child to be scanned
+         */
+        public Builder setSpan(Span span) {
+            this.span = span;
+            return this;
+        }
+      
+        /**
+         * Sets the BindingSet that corresponds to an update to the join child indicated
+         * by Side.  
+         * @param bs - BindingSet update of join child to be joined with results of scan
+         */
+        public Builder setBs(VisibilityBindingSet bs) {
+            this.bs = bs;
+            return this;
+        }
+        
+        /**
+         * @param join - JoinType (left, right, natural inner)
+         */
+        public Builder setJoinType(JoinType join) {
+            this.join = join;
+            return this;
+        }
+        
+        /**
+         * Indicates the join child corresponding to the VisibilityBindingSet update
+         * @param side - side of join the child BindingSet update appeared at
+         */
+        public Builder setSide(Side side) {
+            this.side = side;
+            return this;
+        }
+   
+        /**
+         * Sets the variable order for the join child corresponding to the Span
+         * @param varOrder - Variable order used to join BindingSet with result of scan
+         */
+        public Builder setVarOrder(VariableOrder varOrder) {
+            this.varOrder = varOrder;
+            return this;
+        }
+        
+        /**
+         * @return an instance of {@link JoinBatchInformation} constructed from the parameters passed to this Builder
+         */
+        public JoinBatchInformation build() {
+            return new JoinBatchInformation(batchSize, task, column, span, bs, varOrder, side, join); 
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
new file mode 100644
index 0000000..749a77d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
@@ -0,0 +1,128 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class processes {@link SpanBatchDeleteInformation} objects by
+ * deleting the entries in the Fluo Column corresponding to the {@link Span}
+ * of the BatchInformation object.  This class will delete entries until the
+ * batch size is met, and then create a new SpanBatchDeleteInformation object
+ * with an updated Span whose starting point is the stopping point of this
+ * batch.  If the batch limit is not met, then a new batch is not created and
+ * the task is complete.
+ *
+ */
+public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
+
+    private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class);
+
+    /**
+     * Process SpanBatchDeleteInformation objects by deleting all entries indicated
+     * by Span until batch limit is met.
+     * @param tx - Fluo Transaction
+     * @param row - Byte row identifying BatchInformation
+     * @param batch - SpanBatchDeleteInformation object to be processed
+     */
+    @Override
+    public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception {
+        super.processBatch(tx, row, batch);
+        Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation);
+        SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch;
+        Task task = spanBatch.getTask();
+        int batchSize = spanBatch.getBatchSize();
+        Span span = spanBatch.getSpan();
+        Column column = batch.getColumn();
+        Optional<RowColumn> rowCol = Optional.empty();
+
+        switch (task) {
+        case Add:
+            log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
+            break;
+        case Delete:
+            rowCol = deleteBatch(tx, span, column, batchSize);
+            break;
+        case Update:
+            log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
+            break;
+        default:
+            log.trace("Invalid Task type.  Aborting batch operation.");
+            break;
+        }
+
+        if (rowCol.isPresent()) {
+            Span newSpan = getNewSpan(rowCol.get(), spanBatch.getSpan());
+            log.trace("Batch size met.  There are remaining results that need to be deleted.  Creating a new batch of size: "
+                    + spanBatch.getBatchSize() + " with Span: " + newSpan + " and Column: " + column);
+            spanBatch.setSpan(newSpan);
+            BatchInformationDAO.addBatch(tx, BatchRowKeyUtil.getNodeId(row), spanBatch);
+        }
+    }
+
+    private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) {
+
+        log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column);
+        RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
+        try {
+            Iterator<ColumnScanner> colScannerIter = rs.iterator();
+
+            int count = 0;
+            boolean batchLimitMet = false;
+            Bytes row = span.getStart().getRow();
+            while (colScannerIter.hasNext() && !batchLimitMet) {
+                ColumnScanner colScanner = colScannerIter.next();
+                row = colScanner.getRow();
+                Iterator<ColumnValue> iter = colScanner.iterator();
+                while (iter.hasNext()) {
+                    if (count >= batchSize) {
+                        batchLimitMet = true;
+                        break;
+                    }
+                    ColumnValue colVal = iter.next();
+                    tx.delete(row, colVal.getColumn());
+                    count++;
+                }
+            }
+
+            if (batchLimitMet) {
+                return Optional.of(new RowColumn(row));
+            } else {
+                return Optional.empty();
+            }
+        } catch (Exception e) {
+            return Optional.empty();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
new file mode 100644
index 0000000..3b1e245
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
@@ -0,0 +1,95 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * This class represents a batch order to delete all entries in the Fluo table indicated
+ * by the given Span and Column.  These batch orders are processed by the {@link BatchObserver},
+ * which uses this batch information along with the nodeId passed into the Observer to perform
+ * batch deletes.  
+ *
+ */
+public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
+
+    private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater();
+    
+    public SpanBatchDeleteInformation(int batchSize, Column column, Span span) {
+        super(batchSize, Task.Delete, column, span);
+    }
+    
+    /**
+     * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column}
+     */
+    @Override
+    public BatchBindingSetUpdater getBatchUpdater() {
+        return updater;
+    }
+    
+    
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+
+        private int batchSize = DEFAULT_BATCH_SIZE;
+        private Column column;
+        private Span span;
+
+        /**
+         * @param batchSize - {@link Task}s are applied in batches of this size
+         */
+        public Builder setBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        /**
+         * Sets column to apply batch {@link Task} to
+         * @param column - column batch Task will be applied to
+         * @return
+         */
+        public Builder setColumn(Column column) {
+            this.column = column;
+            return this;
+        }
+
+        /**
+         * @param span - span that batch {@link Task} will be applied to
+         *            
+         */
+        public Builder setSpan(Span span) {
+            this.span = span;
+            return this;
+        }
+
+
+        /**
+         * @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder
+         */
+        public SpanBatchDeleteInformation build() {
+            return new SpanBatchDeleteInformation(batchSize, column, span);
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java
new file mode 100644
index 0000000..e6f69d0
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java
@@ -0,0 +1,58 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Optional;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Serializer/Deserializer for {@link BatchInformation} objects that uses the Gson
+ * Type Adapter {@link BatchInformationTypeAdapter} to do all of the serializing and deserializing.
+ * 
+ *
+ */
+public class BatchInformationSerializer {
+
+    private static Logger log = Logger.getLogger(BatchInformationSerializer.class);
+    private static Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(BatchInformation.class, new BatchInformationTypeAdapter())
+            .create();
+
+    public static byte[] toBytes(BatchInformation arg0) {
+        try {
+            return gson.toJson(arg0).getBytes("UTF-8");
+        } catch (Exception e) {
+            log.info("Unable to serialize BatchInformation: " + arg0);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Optional<BatchInformation> fromBytes(byte[] arg0) {
+        try {
+            String json = new String(arg0, "UTF-8");
+            return Optional.of(gson.fromJson(json, BatchInformation.class));
+        } catch (Exception e) {
+            log.info("Invalid String encoding.  BatchInformation cannot be deserialized.");
+            return Optional.empty();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java
new file mode 100644
index 0000000..d7c15df
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java
@@ -0,0 +1,73 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * JsonSerializer/JsonDeserializer for serializing/deserializing
+ * {@link BatchInformation} objects. This makes use of the
+ * {@link BatchInformationTypeAdapterFactory} to retrieve the appropriate
+ * JsonSerializer/JsonDeserializer given the class name of the particular
+ * implementation of BatchInformation.
+ *
+ */
+public class BatchInformationTypeAdapter implements JsonSerializer<BatchInformation>, JsonDeserializer<BatchInformation> {
+
+    private static final Logger log = Logger.getLogger(BatchInformationTypeAdapter.class);
+    private static final BatchInformationTypeAdapterFactory factory = new BatchInformationTypeAdapterFactory();
+
+    @Override
+    public BatchInformation deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+        try {
+            JsonObject json = arg0.getAsJsonObject();
+            String type = json.get("class").getAsString();
+            JsonDeserializer<? extends BatchInformation> deserializer = factory.getDeserializerFromName(type);
+            return deserializer.deserialize(arg0, arg1, arg2);
+        } catch (Exception e) {
+            log.trace("Unable to deserialize JsonElement: " + arg0);
+            log.trace("Returning an empty Batch");
+            throw new JsonParseException(e);
+        }
+    }
+
+    @Override
+    public JsonElement serialize(BatchInformation batch, Type arg1, JsonSerializationContext arg2) {
+        JsonSerializer<? extends BatchInformation> serializer = factory.getSerializerFromName(batch.getClass().getName());
+        
+        if(batch instanceof SpanBatchDeleteInformation) {
+            return ((SpanBatchInformationTypeAdapter) serializer).serialize((SpanBatchDeleteInformation) batch, arg1, arg2);
+        } else {
+            return ((JoinBatchInformationTypeAdapter) serializer).serialize((JoinBatchInformation) batch, arg1, arg2);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java
new file mode 100644
index 0000000..0221bc2
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java
@@ -0,0 +1,65 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Map;
+
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Factory the uses class names to return the appropriate {@link JsonSerializer} and {@link JsonDeserializer} for serializing
+ * and deserializing {@link BatchInformation} objects.
+ *
+ */
+public class BatchInformationTypeAdapterFactory {
+
+    /**
+     * Retrieve the appropriate {@link JsonSerializer} using the class name of the {@link BatchInformation} implementation
+     * @param name - class name of the BatchInformation object
+     * @return JsonSerializer for serializing BatchInformation objects
+     */
+    public JsonSerializer<? extends BatchInformation> getSerializerFromName(String name) {
+        return serializers.get(name);
+    }
+    
+    /**
+     * Retrieve the appropriate {@link JsonDeserializer} using the class name of the {@link BatchInformation} implementation
+     * @param name - class name of the BatchInformation object
+     * @return JsonDeserializer for deserializing BatchInformation objects
+     */
+    public JsonDeserializer<? extends BatchInformation> getDeserializerFromName(String name) {
+        return deserializers.get(name);
+    }
+    
+    static final Map<String, JsonSerializer<? extends BatchInformation>> serializers = ImmutableMap.of(
+            SpanBatchDeleteInformation.class.getName(), new SpanBatchInformationTypeAdapter(),
+            JoinBatchInformation.class.getName(), new JoinBatchInformationTypeAdapter()
+        );
+    
+    static final Map<String, JsonDeserializer<? extends BatchInformation>> deserializers = ImmutableMap.of(
+            SpanBatchDeleteInformation.class.getName(), new SpanBatchInformationTypeAdapter(),
+            JoinBatchInformation.class.getName(), new JoinBatchInformationTypeAdapter()
+        );
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
new file mode 100644
index 0000000..9f3f1a6
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
@@ -0,0 +1,94 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.lang.reflect.Type;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
+
+import com.google.common.base.Joiner;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * JsonSerializer/JsonDeserializer to serialize/deserialize {@link JoinBatchInformation} objects.
+ *
+ */
+public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatchInformation>, JsonDeserializer<JoinBatchInformation> {
+
+    private static final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter();
+
+    @Override
+    public JsonElement serialize(JoinBatchInformation batch, Type typeOfSrc, JsonSerializationContext context) {
+        JsonObject result = new JsonObject();
+        result.add("class", new JsonPrimitive(batch.getClass().getName()));
+        result.add("batchSize", new JsonPrimitive(batch.getBatchSize()));
+        result.add("task", new JsonPrimitive(batch.getTask().name()));
+        Column column = batch.getColumn();
+        result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" + column.getsQualifier()));
+        Span span = batch.getSpan();
+        result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
+        result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
+        result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+        result.add("varOrder", new JsonPrimitive(Joiner.on(";").join(batch.getVarOrder().getVariableOrders())));
+        result.add("side", new JsonPrimitive(batch.getSide().name()));
+        result.add("joinType", new JsonPrimitive(batch.getJoinType().name()));
+        String updateVarOrderString = Joiner.on(";").join(batch.getBs().getBindingNames());
+        VariableOrder updateVarOrder = new VariableOrder(updateVarOrderString);
+        result.add("bindingSet", new JsonPrimitive(converter.convert(batch.getBs(), updateVarOrder)));
+        result.add("updateVarOrder", new JsonPrimitive(updateVarOrderString));
+        return result;
+    }
+
+    @Override
+    public JoinBatchInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context)
+            throws JsonParseException {
+        JsonObject json = element.getAsJsonObject();
+        int batchSize = json.get("batchSize").getAsInt();
+        Task task = Task.valueOf(json.get("task").getAsString());
+        String[] colArray = json.get("column").getAsString().split("\u0000");
+        Column column = new Column(colArray[0], colArray[1]);
+        String[] rows = json.get("span").getAsString().split("\u0000");
+        boolean startInc = json.get("startInc").getAsBoolean();
+        boolean endInc = json.get("endInc").getAsBoolean();
+        Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
+        VariableOrder varOrder = new VariableOrder(json.get("varOrder").getAsString());
+        VariableOrder updateVarOrder = new VariableOrder(json.get("updateVarOrder").getAsString());
+        VisibilityBindingSet bs = converter.convert(json.get("bindingSet").getAsString(), updateVarOrder);
+        Side side = Side.valueOf(json.get("side").getAsString());
+        JoinType join = JoinType.valueOf(json.get("joinType").getAsString());
+        return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs).setVarOrder(varOrder)
+               .setSide(side).setJoinType(join).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
new file mode 100644
index 0000000..98deb8e
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
@@ -0,0 +1,69 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.lang.reflect.Type;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects.
+ *
+ */
+public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
+
+    @Override
+    public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+        JsonObject json = element.getAsJsonObject();
+        int batchSize = json.get("batchSize").getAsInt();
+        String[] colArray = json.get("column").getAsString().split("\u0000");
+        Column column = new Column(colArray[0], colArray[1]);
+        String[] rows = json.get("span").getAsString().split("\u0000");
+        boolean startInc = json.get("startInc").getAsBoolean();
+        boolean endInc = json.get("endInc").getAsBoolean();
+        Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
+        return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build();
+    }
+
+    @Override
+    public JsonElement serialize(SpanBatchDeleteInformation batch, Type typeOfSrc, JsonSerializationContext context) {
+        JsonObject result = new JsonObject();
+        result.add("class", new JsonPrimitive(batch.getClass().getName()));
+        result.add("batchSize", new JsonPrimitive(batch.getBatchSize()));
+        Column column = batch.getColumn();
+        result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" + column.getsQualifier()));
+        Span span = batch.getSpan();
+        result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
+        result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
+        result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
index 152d156..7c4b3cc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -36,16 +36,16 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
  * Incrementally exports SPARQL query results to Kafka topics.
  */
 public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
+    
     private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class);
-
     private final KafkaProducer<String, VisibilityBindingSet> producer;
 
+
     /**
      * Constructs an instance given a Kafka producer.
      *
-     * @param producer
-     *            for sending result set alerts to a broker. (not null)
-     *            Can be created and configured by {@link KafkaBindingSetExporterFactory}
+     * @param producer for sending result set alerts to a broker. (not null) Can be created and configured by
+     *            {@link KafkaBindingSetExporterFactory}
      */
     public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
         super();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
index 84d3ce6..54c39b7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -24,8 +24,11 @@ import static java.util.Objects.requireNonNull;
 import java.util.Collections;
 
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -36,14 +39,16 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
 
     private final PrecomputedJoinStorage pcjStorage;
+    private final PeriodicQueryResultStorage periodicStorage;
 
     /**
      * Constructs an instance of {@link RyaBindingSetExporter}.
      *
      * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
      */
-    public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
+    public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage, PeriodicQueryResultStorage periodicStorage) {
         this.pcjStorage = checkNotNull(pcjStorage);
+        this.periodicStorage = checkNotNull(periodicStorage);
     }
 
     @Override
@@ -59,8 +64,12 @@ public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
         final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
 
         try {
-            pcjStorage.addResults(pcjId, Collections.singleton(result));
-        } catch (final PCJStorageException e) {
+            if (result.hasBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
+                periodicStorage.addPeriodicQueryResults(pcjId, Collections.singleton(result));
+            } else {
+                pcjStorage.addResults(pcjId, Collections.singleton(result));
+            }
+        } catch (final PCJStorageException | PeriodicQueryStorageException e) {
             throw new ResultExportException("A result could not be exported to Rya.", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
index 86d593f..82ce9c6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -28,8 +28,10 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 
 import com.google.common.base.Optional;
 
@@ -62,9 +64,10 @@ public class RyaBindingSetExporterFactory implements IncrementalBindingSetExport
                 // Setup Rya PCJ Storage.
                 final String ryaInstanceName = params.getRyaInstanceName().get();
                 final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-
+                final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName);
+                
                 // Make the exporter.
-                final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage);
+                final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage, periodicStorage);
                 return Optional.of(exporter);
 
             } catch (final AccumuloException | AccumuloSecurityException e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index ac131e3..3a731c2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -30,12 +30,14 @@ import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
@@ -50,7 +52,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 @DefaultAnnotation(NonNull.class)
 public abstract class BindingSetUpdater extends AbstractObserver {
     private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
-
     // DAO
     private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
@@ -60,6 +61,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
     private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
     private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
     private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
+    private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater();
 
     @Override
     public abstract ObservedColumn getObservedColumn();
@@ -131,6 +133,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
                     throw new RuntimeException("Could not process a Join node.", e);
                 }
                 break;
+                
+            case PERIODIC_QUERY:
+                final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId);
+                try{
+                    periodicQueryUpdater.updatePeriodicBinResults(tx, observedBindingSet, parentPeriodicQuery);
+                } catch(Exception e) {
+                    throw new RuntimeException("Could not process PeriodicBin node.", e);
+                }
+                break;
 
             case AGGREGATION:
                 final AggregationMetadata parentAggregation = queryDao.readAggregationMetadata(tx, parentNodeId);
@@ -141,8 +152,9 @@ public abstract class BindingSetUpdater extends AbstractObserver {
                 }
                 break;
 
+
             default:
-                throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, or Query, but was " + parentNodeType);
+                throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index f5c7177..ee03334 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.query.BindingSet;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 141ccc7..28e31d8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.query.BindingSet;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
new file mode 100644
index 0000000..e7072e7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+/**
+ * This Observer is responsible for assigning Periodic Bin Ids to BindingSets.
+ * This class delegates to the {@link BindingSetUpdater} process method, which
+ * uses the {@link PeriodicQueryUpdater} to extract the time stamp from the BindingSet.
+ * The PeriodicQueryUpdater creates one instance of the given BindingSet for each bin
+ * that the time stamp is assigned to by the updater, and these BindingSets are written
+ * to the parent node of the given PeriodicQueryMetadata node.
+ *
+ */
+public class PeriodicQueryObserver extends BindingSetUpdater {
+
+    private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+    @Override
+    public ObservedColumn getObservedColumn() {
+        return new ObservedColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET, NotificationType.STRONG);
+    }
+
+    @Override
+    public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception {
+        requireNonNull(tx);
+        requireNonNull(row);
+
+        // Read the Join metadata.
+        final String periodicBinNodeId = BindingSetRow.make(row).getNodeId();
+        final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId);
+
+        // Read the Visibility Binding Set from the Value.
+        final Bytes valueBytes = tx.get(row, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET);
+        final VisibilityBindingSet periodicBinBindingSet = BS_SERDE.deserialize(valueBytes);
+
+        // Figure out which node needs to handle the new metadata.
+        final String parentNodeId = periodicBinMetadata.getParentNodeId();
+
+        return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId);
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index b675ba7..fbdca08 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -29,7 +29,6 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.utils.VisibilitySimplifier;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
@@ -38,6 +37,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporter
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index b0548b4..69a651e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.query.BindingSet;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 3c43885..6fc8e91 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -34,12 +34,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
 
 import com.google.common.base.Charsets;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
index 3bc8da6..ff42a0f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -321,6 +321,13 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.varOrder = varOrder;
             return this;
         }
+        
+        /**
+         * @return the variable order of binding sets that are emitted by this node.
+         */
+        public VariableOrder getVariableOrder() {
+            return varOrder;
+        }
 
         /**
          * @param parentNodeId - The Node ID of this node's parent.
@@ -330,6 +337,10 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.parentNodeId = parentNodeId;
             return this;
         }
+       
+        public String getParentNodeId() {
+            return parentNodeId;
+        }
 
         /**
          * @param childNodeId - The Node ID of this node's child.
@@ -360,6 +371,13 @@ public class AggregationMetadata extends CommonNodeMetadata {
             this.groupByVariables = groupByVariables;
             return this;
         }
+        
+        /**
+         * @return variable order that defines how data is grouped for the aggregation function
+         */
+        public VariableOrder getGroupByVariableOrder() {
+            return groupByVariables;
+        }
 
         /**
          * @return An instance of {@link AggregationMetadata} build using this builder's values.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
index 8866bd4..7e2e995 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
@@ -18,19 +18,19 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
 import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
 import com.google.common.base.Objects;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
 /**
  * Metadata that is specific to Filter nodes.
  */
@@ -38,8 +38,7 @@ import com.google.common.base.Objects;
 @DefaultAnnotation(NonNull.class)
 public class FilterMetadata extends CommonNodeMetadata {
 
-    private final String originalSparql;
-    private final int filterIndexWithinSparql;
+    private final String filterSparql;
     private final String parentNodeId;
     private final String childNodeId;
 
@@ -48,7 +47,7 @@ public class FilterMetadata extends CommonNodeMetadata {
      *
      * @param nodeId - The ID the Fluo app uses to reference this node. (not null)
      * @param varOrder - The variable order of binding sets that are emitted by this node. (not null)
-     * @param originalSparql - The original SPARQL query the filter is derived from. (not null)
+     * @param filterSparql - SPARQL query representing the filter as generated by {@link FilterSerializer#serialize}. (not null)
      * @param filterIndexWithinSparql - The index of the filter within the original SPARQL query
      *   that this node processes. (not null)
      * @param parentNodeId - The node id of this node's parent. (not null)
@@ -57,14 +56,11 @@ public class FilterMetadata extends CommonNodeMetadata {
     public FilterMetadata(
             final String nodeId,
             final VariableOrder varOrder,
-            final String originalSparql,
-            final int filterIndexWithinSparql,
+            final String filterSparql,
             final String parentNodeId,
             final String childNodeId) {
         super(nodeId, varOrder);
-        this.originalSparql = checkNotNull(originalSparql);
-        checkArgument(filterIndexWithinSparql >= 0 , "filterIndexWithinSparql must be >= 0, was " + filterIndexWithinSparql);
-        this.filterIndexWithinSparql = filterIndexWithinSparql;
+        this.filterSparql = checkNotNull(filterSparql);
         this.parentNodeId = checkNotNull(parentNodeId);
         this.childNodeId = checkNotNull(childNodeId);
     }
@@ -72,16 +68,8 @@ public class FilterMetadata extends CommonNodeMetadata {
     /**
      * @return The original SPARQL query the filter is derived from.
      */
-    public String getOriginalSparql() {
-        return originalSparql;
-    }
-
-    /**
-     * @return The index of the filter within the original SPARQL query that
-     *   this node processes.
-     */
-    public int getFilterIndexWithinSparql() {
-        return filterIndexWithinSparql;
+    public String getFilterSparql() {
+        return filterSparql;
     }
 
     /**
@@ -103,8 +91,7 @@ public class FilterMetadata extends CommonNodeMetadata {
         return Objects.hashCode(
                 super.getNodeId(),
                 super.getVariableOrder(),
-                originalSparql,
-                filterIndexWithinSparql,
+                filterSparql,
                 parentNodeId,
                 childNodeId);
     }
@@ -119,8 +106,7 @@ public class FilterMetadata extends CommonNodeMetadata {
             if(super.equals(o)) {
                 final FilterMetadata filterMetadata = (FilterMetadata)o;
                 return new EqualsBuilder()
-                        .append(originalSparql, filterMetadata.originalSparql)
-                        .append(filterIndexWithinSparql, filterMetadata.filterIndexWithinSparql)
+                        .append(filterSparql, filterMetadata.filterSparql)
                         .append(parentNodeId, filterMetadata.parentNodeId)
                         .append(childNodeId, filterMetadata.childNodeId)
                         .isEquals();
@@ -140,8 +126,7 @@ public class FilterMetadata extends CommonNodeMetadata {
                 .append("    Variable Order: " + super.getVariableOrder() + "\n")
                 .append("    Parent Node ID: " + parentNodeId + "\n")
                 .append("    Child Node ID: " + childNodeId + "\n")
-                .append("    Original SPARQL: " + originalSparql + "\n")
-                .append("    Filter Index Within SPARQL: " + filterIndexWithinSparql + "\n")
+                .append("    Original SPARQL: " + filterSparql + "\n")
                 .append("}")
                 .toString();
     }
@@ -164,8 +149,7 @@ public class FilterMetadata extends CommonNodeMetadata {
 
         private final String nodeId;
         private VariableOrder varOrder;
-        private String originalSparql;
-        private int filterIndexWithinSparql;
+        private String filterSparql;
         private String parentNodeId;
         private String childNodeId;
 
@@ -202,20 +186,8 @@ public class FilterMetadata extends CommonNodeMetadata {
          * @param originalSparql - The original SPARQL query the filter is derived from.
          * @return This builder so that method invocations may be chained.
          */
-        public Builder setOriginalSparql(final String originalSparql) {
-            this.originalSparql = originalSparql;
-            return this;
-        }
-
-        /**
-         * Set the index of the filter within the original SPARQL query that this node processes.
-         *
-         * @param filterIndexWithinSparql - The index of the filter within the original
-         * SPARQL query that this node processes.
-         * @return This builder so that method invocations may be chained.
-         */
-        public Builder setFilterIndexWithinSparql(final int filterIndexWithinSparql) {
-            this.filterIndexWithinSparql = filterIndexWithinSparql;
+        public Builder setFilterSparql(final String originalSparql) {
+            this.filterSparql = originalSparql;
             return this;
         }
 
@@ -248,8 +220,7 @@ public class FilterMetadata extends CommonNodeMetadata {
             return new FilterMetadata(
                     nodeId,
                     varOrder,
-                    originalSparql,
-                    filterIndexWithinSparql,
+                    filterSparql,
                     parentNodeId,
                     childNodeId);
         }