You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/04/24 15:06:23 UTC

[5/9] incubator-rya git commit: RYA-260 Fluo PCJ application has had Aggregation support added to it. Also fixed a bunch of resource leaks that were causing integration tests to fail. Closes #156.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index f9acb11..dfc3333 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -18,23 +18,30 @@
  */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
 import java.util.Map;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 
-import com.google.common.collect.Sets;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Reads and writes {@link FluoQuery} instances and their components to/from
@@ -50,8 +57,8 @@ public class FluoQueryMetadataDAO {
      * @param metadata - The Query node metadata that will be written to the table. (not null)
      */
     public void write(final TransactionBase tx, final QueryMetadata metadata) {
-        checkNotNull(tx);
-        checkNotNull(metadata);
+        requireNonNull(tx);
+        requireNonNull(metadata);
 
         final String rowId = metadata.getNodeId();
         tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId);
@@ -65,19 +72,19 @@ public class FluoQueryMetadataDAO {
      *
      * @param sx - The snapshot that will be used to read the metadata . (not null)
      * @param nodeId - The nodeId of the Query node that will be read. (not nul)
-     * @return The {@link QueryMetadata} that was read from table.
+     * @return The {@link QueryMetadata} that was read from the table.
      */
     public QueryMetadata readQueryMetadata(final SnapshotBase sx, final String nodeId) {
         return readQueryMetadataBuilder(sx, nodeId).build();
     }
 
     private QueryMetadata.Builder readQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
-        checkNotNull(sx);
-        checkNotNull(nodeId);
+        requireNonNull(sx);
+        requireNonNull(nodeId);
 
         // Fetch the values from the Fluo table.
         final String rowId = nodeId;
-        final Map<Column, String> values = sx.gets(rowId, 
+        final Map<Column, String> values = sx.gets(rowId,
                 FluoQueryColumns.QUERY_VARIABLE_ORDER,
                 FluoQueryColumns.QUERY_SPARQL,
                 FluoQueryColumns.QUERY_CHILD_NODE_ID);
@@ -102,8 +109,8 @@ public class FluoQueryMetadataDAO {
      * @param metadata - The Filter node metadata that will be written to the table. (not null)
      */
     public void write(final TransactionBase tx, final FilterMetadata metadata) {
-        checkNotNull(tx);
-        checkNotNull(metadata);
+        requireNonNull(tx);
+        requireNonNull(metadata);
 
         final String rowId = metadata.getNodeId();
         tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId);
@@ -119,19 +126,19 @@ public class FluoQueryMetadataDAO {
      *
      * @param sx - The snapshot that will be used to read the metadata. (not null)
      * @param nodeId - The nodeId of the Filter node that will be read. (not nul)
-     * @return The {@link FilterMetadata} that was read from table.
+     * @return The {@link FilterMetadata} that was read from the table.
      */
     public FilterMetadata readFilterMetadata(final SnapshotBase sx, final String nodeId) {
         return readFilterMetadataBuilder(sx, nodeId).build();
     }
 
     private FilterMetadata.Builder readFilterMetadataBuilder(final SnapshotBase sx, final String nodeId) {
-        checkNotNull(sx);
-        checkNotNull(nodeId);
+        requireNonNull(sx);
+        requireNonNull(nodeId);
 
         // Fetch the values from the Fluo table.
         final String rowId = nodeId;
-        final Map<Column, String> values = sx.gets(rowId, 
+        final Map<Column, String> values = sx.gets(rowId,
                 FluoQueryColumns.FILTER_VARIABLE_ORDER,
                 FluoQueryColumns.FILTER_ORIGINAL_SPARQL,
                 FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL,
@@ -162,8 +169,8 @@ public class FluoQueryMetadataDAO {
      * @param metadata - The Join node metadata that will be written to the table. (not null)
      */
     public void write(final TransactionBase tx, final JoinMetadata metadata) {
-        checkNotNull(tx);
-        checkNotNull(metadata);
+        requireNonNull(tx);
+        requireNonNull(metadata);
 
         final String rowId = metadata.getNodeId();
         tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId);
@@ -179,15 +186,15 @@ public class FluoQueryMetadataDAO {
      *
      * @param sx - The snapshot that will be used to read the metadata. (not null)
      * @param nodeId - The nodeId of the Join node that will be read. (not nul)
-     * @return The {@link JoinMetadata} that was read from table.
+     * @return The {@link JoinMetadata} that was read from the table.
      */
     public JoinMetadata readJoinMetadata(final SnapshotBase sx, final String nodeId) {
         return readJoinMetadataBuilder(sx, nodeId).build();
     }
 
     private JoinMetadata.Builder readJoinMetadataBuilder(final SnapshotBase sx, final String nodeId) {
-        checkNotNull(sx);
-        checkNotNull(nodeId);
+        requireNonNull(sx);
+        requireNonNull(nodeId);
 
         // Fetch the values from the Fluo table.
         final String rowId = nodeId;
@@ -224,8 +231,8 @@ public class FluoQueryMetadataDAO {
      * @param metadata - The Statement Pattern node metadata that will be written to the table. (not null)
      */
     public void write(final TransactionBase tx, final StatementPatternMetadata metadata) {
-        checkNotNull(tx);
-        checkNotNull(metadata);
+        requireNonNull(tx);
+        requireNonNull(metadata);
 
         final String rowId = metadata.getNodeId();
         tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_NODE_ID, rowId);
@@ -239,15 +246,15 @@ public class FluoQueryMetadataDAO {
      *
      * @param sx - The snapshot that will be used to read the metadata. (not null)
      * @param nodeId - The nodeId of the Statement Pattern node that will be read. (not nul)
-     * @return The {@link StatementPatternMetadata} that was read from table.
+     * @return The {@link StatementPatternMetadata} that was read from the table.
      */
     public StatementPatternMetadata readStatementPatternMetadata(final SnapshotBase sx, final String nodeId) {
         return readStatementPatternMetadataBuilder(sx, nodeId).build();
     }
 
     private StatementPatternMetadata.Builder readStatementPatternMetadataBuilder(final SnapshotBase sx, final String nodeId) {
-        checkNotNull(sx);
-        checkNotNull(nodeId);
+        requireNonNull(sx);
+        requireNonNull(nodeId);
 
         // Fetch the values from the Fluo table.
         final String rowId = nodeId;
@@ -270,14 +277,104 @@ public class FluoQueryMetadataDAO {
     }
 
     /**
+     * Write an instance of {@link AggregationMetadata} to the Fluo table.
+     *
+     * @param tx - The transaction that will be used to commit the metadata. (not null)
+     * @param metadata - The Aggregation node metadata that will be written to the table. (not null)
+     */
+    public void write(final TransactionBase tx, final AggregationMetadata metadata) {
+        requireNonNull(tx);
+        requireNonNull(metadata);
+
+        final String rowId = metadata.getNodeId();
+        tx.set(rowId, FluoQueryColumns.AGGREGATION_NODE_ID, rowId);
+        tx.set(rowId, FluoQueryColumns.AGGREGATION_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+        tx.set(rowId, FluoQueryColumns.AGGREGATION_PARENT_NODE_ID, metadata.getParentNodeId());
+        tx.set(rowId, FluoQueryColumns.AGGREGATION_CHILD_NODE_ID, metadata.getChildNodeId());
+
+        // Store the Group By variable order.
+        final VariableOrder groupByVars = metadata.getGroupByVariableOrder();
+        final String groupByString = Joiner.on(";").join(groupByVars.getVariableOrders());
+        tx.set(rowId, FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES, groupByString);
+
+        // Serialize the collection of AggregationElements.
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try(final ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+            oos.writeObject( metadata.getAggregations() );
+        } catch (final IOException e) {
+            throw new RuntimeException("Problem encountered while writing AggregationMetadata to the Fluo table. Unable " +
+                    "to serialize the AggregationElements to a byte[].", e);
+        }
+        tx.set(Bytes.of(rowId.getBytes(Charsets.UTF_8)), FluoQueryColumns.AGGREGATION_AGGREGATIONS, Bytes.of(baos.toByteArray()));
+    }
+
+    /**
+     * Read an instance of {@link AggregationMetadata} from the Fluo table.
+     *
+     * @param sx - The snapshot that will be used to read the metadata. (not null)
+     * @param nodeId - The nodeId of the Aggregation node that will be read. (not null)
+     * @return The {@link AggregationMetadata} that was read from the table.
+     */
+    public AggregationMetadata readAggregationMetadata(final SnapshotBase sx, final String nodeId) {
+        return readAggregationMetadataBuilder(sx, nodeId).build();
+    }
+
+    private AggregationMetadata.Builder readAggregationMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+        requireNonNull(sx);
+        requireNonNull(nodeId);
+
+        // Fetch the values from the Fluo table.
+        final String rowId = nodeId;
+        final Map<Column, String> values = sx.gets(rowId,
+                FluoQueryColumns.AGGREGATION_VARIABLE_ORDER,
+                FluoQueryColumns.AGGREGATION_PARENT_NODE_ID,
+                FluoQueryColumns.AGGREGATION_CHILD_NODE_ID,
+                FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES);
+
+
+        // Return an object holding them.
+        final String varOrderString = values.get(FluoQueryColumns.AGGREGATION_VARIABLE_ORDER);
+        final VariableOrder varOrder = new VariableOrder(varOrderString);
+
+        final String parentNodeId = values.get(FluoQueryColumns.AGGREGATION_PARENT_NODE_ID);
+        final String childNodeId = values.get(FluoQueryColumns.AGGREGATION_CHILD_NODE_ID);
+
+        // Read the Group By variable order if one was present.
+        final String groupByString = values.get(FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES);
+        final VariableOrder groupByVars = groupByString.isEmpty() ? new VariableOrder() : new VariableOrder( groupByString.split(";") );
+
+        // Deserialize the collection of AggregationElements.
+        final Bytes aggBytes = sx.get(Bytes.of(nodeId.getBytes(Charsets.UTF_8)), FluoQueryColumns.AGGREGATION_AGGREGATIONS);
+        final Collection<AggregationElement> aggregations;
+        try(final ObjectInputStream ois = new ObjectInputStream(aggBytes.toInputStream())) {
+             aggregations = (Collection<AggregationElement>)ois.readObject();
+        } catch (final IOException | ClassNotFoundException e) {
+            throw new RuntimeException("Problem encountered while reading AggregationMetadata from the Fluo table. Unable " +
+                    "to deserialize the AggregationElements from a byte[].", e);
+        }
+
+        final AggregationMetadata.Builder builder = AggregationMetadata.builder(nodeId)
+                .setVariableOrder(varOrder)
+                .setParentNodeId(parentNodeId)
+                .setChildNodeId(childNodeId)
+                .setGroupByVariableOrder(groupByVars);
+
+        for(final AggregationElement aggregation : aggregations) {
+            builder.addAggregation(aggregation);
+        }
+
+        return builder;
+    }
+
+    /**
      * Write an instance of {@link FluoQuery} to the Fluo table.
      *
      * @param tx - The transaction that will be used to commit the metadata. (not null)
      * @param query - The query metadata that will be written to the table. (not null)
      */
     public void write(final TransactionBase tx, final FluoQuery query) {
-        checkNotNull(tx);
-        checkNotNull(query);
+        requireNonNull(tx);
+        requireNonNull(query);
 
         // Store the Query ID so that it may be looked up from the original SPARQL string.
         final String sparql = query.getQueryMetadata().getSparql();
@@ -298,6 +395,10 @@ public class FluoQueryMetadataDAO {
         for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) {
             write(tx, statementPattern);
         }
+
+        for(final AggregationMetadata aggregation : query.getAggregationMetadata()) {
+            write(tx, aggregation);
+        }
     }
 
     /**
@@ -308,8 +409,8 @@ public class FluoQueryMetadataDAO {
      * @return The {@link FluoQuery} that was read from table.
      */
     public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) {
-        checkNotNull(sx);
-        checkNotNull(queryId);
+        requireNonNull(sx);
+        requireNonNull(queryId);
 
         final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
         addChildMetadata(sx, fluoQueryBuilder, queryId);
@@ -317,9 +418,9 @@ public class FluoQueryMetadataDAO {
     }
 
     private void addChildMetadata(final SnapshotBase sx, final FluoQuery.Builder builder, final String childNodeId) {
-        checkNotNull(sx);
-        checkNotNull(builder);
-        checkNotNull(childNodeId);
+        requireNonNull(sx);
+        requireNonNull(builder);
+        requireNonNull(childNodeId);
 
         final NodeType childType = NodeType.fromNodeId(childNodeId).get();
         switch(childType) {
@@ -357,6 +458,15 @@ public class FluoQueryMetadataDAO {
                 final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
                 builder.addStatementPatternBuilder(spBuilder);
                 break;
+
+            case AGGREGATION:
+                // Add this node's metadata.
+                final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
+                builder.addAggregateMetadata(aggregationBuilder);
+
+                // Add it's child's metadata.
+                addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
+                break;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 2128700..562470a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
 import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -29,32 +30,40 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Extension;
 import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Group;
+import org.openrdf.query.algebra.GroupElem;
 import org.openrdf.query.algebra.Join;
 import org.openrdf.query.algebra.LeftJoin;
 import org.openrdf.query.algebra.Projection;
 import org.openrdf.query.algebra.QueryModelNode;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Var;
 import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
 import org.openrdf.query.parser.ParsedQuery;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import net.jcip.annotations.Immutable;
+
 /**
  * Creates the {@link FluoQuery} metadata that is required by the Fluo
  * application to process a SPARQL query.
@@ -119,7 +128,7 @@ public class SparqlFluoQueryBuilder {
          */
         public Optional<String> getId(final QueryModelNode node) {
             checkNotNull(node);
-            return Optional.fromNullable( nodeIds.get(node) );
+            return Optional.ofNullable( nodeIds.get(node) );
         }
 
         /**
@@ -157,14 +166,15 @@ public class SparqlFluoQueryBuilder {
                 prefix = JOIN_PREFIX;
             } else if(node instanceof Projection) {
                 prefix = QUERY_PREFIX;
+            } else if(node instanceof Extension) {
+                prefix = AGGREGATION_PREFIX;
             } else {
-                throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Projection} but was " + node.getClass());
+                throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
             }
 
             // Create the unique portion of the id.
             final String unique = UUID.randomUUID().toString().replaceAll("-", "");
 
-
             // Put them together to create the Node ID.
             return prefix + "_" + unique;
         }
@@ -204,6 +214,77 @@ public class SparqlFluoQueryBuilder {
             this.nodeIds = checkNotNull(nodeIds);
         }
 
+        /**
+         * If we encounter an Extension node that contains a Group, then we've found an aggregation.
+         */
+        @Override
+        public void meet(final Extension node) {
+            final TupleExpr arg = node.getArg();
+            if(arg instanceof Group) {
+                final Group group = (Group) arg;
+
+                // Get the Aggregation Node's id.
+                final String aggregationId = nodeIds.getOrMakeId(node);
+
+                // Get the group's child node id. This call forces it to be a supported child type.
+                final TupleExpr child = group.getArg();
+                final String childNodeId = nodeIds.getOrMakeId( child );
+
+                // Get the list of group by binding names.
+                VariableOrder groupByVariableOrder = null;
+                if(!group.getGroupBindingNames().isEmpty()) {
+                    groupByVariableOrder = new VariableOrder(group.getGroupBindingNames());
+                } else {
+                    groupByVariableOrder = new VariableOrder();
+                }
+
+                // The aggregations that need to be performed are the Group Elements.
+                final List<AggregationElement> aggregations = new ArrayList<>();
+                for(final GroupElem groupElem : group.getGroupElements()) {
+                    // Figure out the type of the aggregation.
+                    final AggregateOperator operator = groupElem.getOperator();
+                    final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() );
+
+                    // If the type is one we support, create the AggregationElement.
+                    if(type.isPresent()) {
+                        final String resultBindingName = groupElem.getName();
+
+                        final AtomicReference<String> aggregatedBindingName = new AtomicReference<>();
+                        groupElem.visitChildren(new QueryModelVisitorBase<RuntimeException>() {
+                            @Override
+                            public void meet(final Var node) {
+                                aggregatedBindingName.set( node.getName() );
+                            }
+                        });
+
+                        aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) );
+                    }
+                }
+
+                // Update the aggregation's metadata.
+                AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(aggregationId).orNull();
+                if(aggregationBuilder == null) {
+                    aggregationBuilder = AggregationMetadata.builder(aggregationId);
+                    fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+                }
+
+                aggregationBuilder.setChildNodeId(childNodeId);
+                aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder);
+                for(final AggregationElement aggregation : aggregations) {
+                    aggregationBuilder.addAggregation(aggregation);
+                }
+
+                // Update the child node's metadata.
+                final Set<String> childVars = getVars(child);
+                final VariableOrder childVarOrder = new VariableOrder(childVars);
+
+                setChildMetadata(childNodeId, childVarOrder, aggregationId);
+            }
+
+            // Walk to the next node.
+            super.meet(node);
+        }
+
         @Override
         public void meet(final StatementPattern node) {
             // Extract metadata that will be stored from the node.
@@ -386,10 +467,21 @@ public class SparqlFluoQueryBuilder {
                     filterBuilder.setParentNodeId(parentNodeId);
                     break;
 
-            case QUERY:
-                throw new IllegalArgumentException("QUERY nodes do not have children.");
-            default:
-                throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+                case AGGREGATION:
+                    AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+                    if(aggregationBuilder == null) {
+                        aggregationBuilder = AggregationMetadata.builder(childNodeId);
+                        fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+                    }
+
+                    aggregationBuilder.setVariableOrder(childVarOrder);
+                    aggregationBuilder.setParentNodeId(parentNodeId);
+                    break;
+
+                case QUERY:
+                    throw new IllegalArgumentException("QUERY nodes do not have children.");
+                default:
+                    throw new IllegalArgumentException("Unsupported NodeType: " + childType);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java
new file mode 100644
index 0000000..30f026c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * A utility class that defines functions that make it easier to work with {@link BindingSet} objects.
+ */
+public class BindingSetUtil {
+
+    /**
+     * Create a new {@link BindingSet} that only includes the bindings whose names appear within the {@code variableOrder}.
+     * If no binding is found for a variable, then that binding is just omitted from the resulting object.
+     *
+     * @param variableOrder - Defines which bindings will be kept. (not null)
+     * @param bindingSet - Contains the source {@link Binding}s. (not null)
+     * @return A new {@link BindingSet} containing only the specified bindings.
+     */
+    public static BindingSet keepBindings(final VariableOrder variableOrder, final BindingSet bindingSet) {
+        requireNonNull(variableOrder);
+        requireNonNull(bindingSet);
+
+        final MapBindingSet result = new MapBindingSet();
+        for(final String bindingName : variableOrder) {
+            if(bindingSet.hasBinding(bindingName)) {
+                final Binding binding = bindingSet.getBinding(bindingName);
+                result.addBinding(binding);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java
new file mode 100644
index 0000000..ffb2320
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Charsets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * TODO doc that this implements utility functions used to create the Fluo Row Keys used when referencing the binding
+ * set results of a query node.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RowKeyUtil {
+
+    private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter();
+
+    /**
+     * Creates the Row Key that will be used by a node within the PCJ Fluo application to represent where a specific
+     * result of that node will be placed.
+     *
+     * @param nodeId - Identifies the Node that the Row Key is for. (not null)
+     * @param variableOrder - Specifies which bindings from {@code bindingSet} will be included within the Row Key as
+     *   well as the order they will appear. (not null)
+     * @param bindingSet - The Binding Set whose values will be used to create the Row Key. (not null)
+     * @return A Row Key built using the provided values.
+     */
+    public static Bytes makeRowKey(final String nodeId, final VariableOrder variableOrder, final BindingSet bindingSet) {
+        requireNonNull(nodeId);
+        requireNonNull(variableOrder);
+        requireNonNull(bindingSet);
+
+        // The Row Key starts with the Node ID of the node the result belongs to.
+        String rowId = nodeId + IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+        // Append the String formatted bindings that are included in the Variable Order. The Variable Order also defines
+        // the order the binding will be written to the Row Key. If a Binding is missing for one of the Binding Names
+        // that appears within the Variable Order, then an empty value will be written for that location within the Row Key.
+        rowId += BS_CONVERTER.convert(bindingSet, variableOrder);
+
+        // Format the Row Key as a UTF 8 encoded Bytes object.
+        return Bytes.of( rowId.getBytes(Charsets.UTF_8) );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
new file mode 100644
index 0000000..99791ee
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Tests the methods of {@link VisibilityBindingSetSerDe}.
+ */
+public class VisibilityBindingSetSerDeTest {
+
+    @Test
+    public void rountTrip() throws Exception {
+        final ValueFactory vf = new ValueFactoryImpl();
+
+        final MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("name", vf.createLiteral("Alice"));
+        bs.addBinding("age", vf.createLiteral(5));
+        final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u");
+
+        final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe();
+        final Bytes bytes = serde.serialize(original);
+        final VisibilityBindingSet result = serde.deserialize(bytes);
+
+        assertEquals(original, result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index 43dac3c..854798d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -135,7 +135,7 @@ public class NewQueryCommand implements PcjAdminClientCommand {
             // Tell the Fluo PCJ Updater app to maintain the PCJ.
             createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, accumulo, ryaTablePrefix);
 
-        } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
+        } catch (MalformedQueryException | PcjException | RyaDAOException e) {
             throw new ExecutionException("Could not create and load historic matches into the the Fluo app for the query.", e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 105f697..c8dc737 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -18,19 +18,29 @@
  */
 package org.apache.rya.indexing.pcj.fluo.demo;
 
+import java.io.IOException;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.openrdf.model.Statement;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.MalformedQueryException;
@@ -45,16 +55,6 @@ import org.openrdf.sail.SailException;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-
 /**
  * Demonstrates historicly added Rya statements that are stored within the core
  * Rya tables joining with newly streamed statements into the Fluo application.
@@ -181,7 +181,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
             // Tell the Fluo app to maintain it.
             new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
 
-        } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
+        } catch (MalformedQueryException | PcjException | RyaDAOException e) {
             throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e);
         }
 
@@ -192,11 +192,11 @@ public class FluoAndHistoricPcjsDemo implements Demo {
 
         // 5. Show that the Fluo app exported the results to the PCJ table in Accumulo.
         log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:");
-        try {
-            for(final BindingSet result : pcjStorage.listResults(pcjId)) {
-                log.info("    " + result);
+        try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+            while(resultsIt.hasNext()) {
+                log.info("    " + resultsIt.next());
             }
-        } catch (final PCJStorageException e) {
+        } catch (final Exception e) {
             throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e);
         }
         waitForEnter();
@@ -257,11 +257,11 @@ public class FluoAndHistoricPcjsDemo implements Demo {
 
         // 8. Show the new results have been exported to the PCJ table in Accumulo.
         log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:");
-        try {
-            for(final BindingSet result : pcjStorage.listResults(pcjId)) {
-                log.info("    " + result);
+        try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+            while(resultsIt.hasNext()) {
+                log.info("    " + resultsIt.next());
             }
-        } catch (final PCJStorageException e) {
+        } catch (final Exception e) {
             throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e);
         }
         log.info("");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index ab99ecd..9263362 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -41,6 +41,11 @@
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.indexing</artifactId>
         </dependency>
+         <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
+
         <!-- Testing dependencies. -->
         <dependency>
             <groupId>org.apache.fluo</groupId>
@@ -86,8 +91,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
-
-
+        <dependency>
+             <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-test</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
deleted file mode 100644
index 6e696c8..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.accumulo.minicluster.MiniAccumuloConfig;
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.sail.config.RyaSailFactory;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.openrdf.model.Statement;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.sail.Sail;
-
-import com.google.common.io.Files;
-
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-
-/**
- * Integration tests that ensure the Fluo application processes PCJs results
- * correctly.
- * <p>
- * This class is being ignored because it doesn't contain any unit tests.
- */
-public abstract class ITBase {
-    private static final Logger log = Logger.getLogger(ITBase.class);
-
-    // Rya data store and connections.
-    protected static final String RYA_INSTANCE_NAME = "demo_";
-    protected RyaSailRepository ryaRepo = null;
-    protected RepositoryConnection ryaConn = null;
-
-    // Mini Accumulo Cluster
-    protected static final String ACCUMULO_USER = "root";
-    protected static final String ACCUMULO_PASSWORD = "password";
-    protected MiniAccumuloCluster cluster;
-    protected static Connector accumuloConn = null;
-    protected String instanceName = null;
-    protected String zookeepers = null;
-
-    // Fluo data store and connections.
-    protected static final String FLUO_APP_NAME = "IntegrationTests";
-    protected MiniFluo fluo = null;
-    protected FluoClient fluoClient = null;
-
-    @BeforeClass
-    public static void killLoudLogs() {
-        Logger.getRootLogger().setLevel(Level.ERROR);
-        Logger.getLogger(ClientCnxn.class).setLevel(Level.OFF);
-    }
-
-    @Before
-    public void setupMiniResources() throws Exception {
-        // Will set defaults for log4J
-        org.apache.log4j.BasicConfigurator.configure();
-    	// Initialize the Mini Accumulo that will be used to host Rya and Fluo.
-    	setupMiniAccumulo();
-
-        // Initialize the Mini Fluo that will be used to store created queries.
-        fluo = startMiniFluo();
-        fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
-
-        // Initialize the Rya that will be used by the tests.
-        ryaRepo = setupRya(instanceName, zookeepers);
-        ryaConn = ryaRepo.getConnection();
-    }
-
-    @After
-    public void shutdownMiniResources() {
-        if (ryaConn != null) {
-            try {
-                log.info("Shutting down Rya Connection.");
-                ryaConn.close();
-                log.info("Rya Connection shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Rya Connection.", e);
-            }
-        }
-
-        if (ryaRepo != null) {
-            try {
-                log.info("Shutting down Rya Repo.");
-                ryaRepo.shutDown();
-                log.info("Rya Repo shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Rya Repo.", e);
-            }
-        }
-
-        if (fluoClient != null) {
-            try {
-                log.info("Shutting down Fluo Client.");
-                fluoClient.close();
-                log.info("Fluo Client shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Fluo Client.", e);
-            }
-        }
-
-        if (fluo != null) {
-            try {
-                log.info("Shutting down Mini Fluo.");
-                fluo.close();
-                log.info("Mini Fluo shut down.");
-            } catch (final Exception e) {
-                log.error("Could not shut down the Mini Fluo.", e);
-            }
-        }
-
-        if(cluster != null) {
-            try {
-                log.info("Shutting down the Mini Accumulo being used as a Rya store.");
-                cluster.stop();
-                log.info("Mini Accumulo being used as a Rya store shut down.");
-            } catch(final Exception e) {
-                log.error("Could not shut down the Mini Accumulo.", e);
-            }
-        }
-    }
-
-    /**
-     * A helper fuction for creating a {@link BindingSet} from an array of
-     * {@link Binding}s.
-     *
-     * @param bindings - The bindings to include in the set. (not null)
-     * @return A {@link BindingSet} holding the bindings.
-     */
-    protected static BindingSet makeBindingSet(final Binding... bindings) {
-        final MapBindingSet bindingSet = new MapBindingSet();
-        for (final Binding binding : bindings) {
-            bindingSet.addBinding(binding);
-        }
-        return bindingSet;
-    }
-
-    /**
-     * A helper function for creating a {@link RyaStatement} that represents a
-     * Triple.
-     *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple. (not null)
-     * @return A Triple as a {@link RyaStatement}.
-     */
-    protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) {
-        checkNotNull(subject);
-        checkNotNull(predicate);
-        checkNotNull(object);
-
-        final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject))
-                .setPredicate(new RyaURI(predicate));
-
-        if (object.startsWith("http://") || object.startsWith("tag:")) {
-            builder.setObject(new RyaURI(object));
-        } else {
-            builder.setObject(new RyaType(object));
-        }
-
-        return builder.build();
-    }
-
-    /**
-     * A helper function for creating a {@link RyaStatement} that represents a Triple.
-     * This overload takes a typed literal for the object. Prepare it like this for example specify the type (wktLiteral) and the value (Point...):
-     * makeRyaStatement(s, p, new RyaType(new URIImpl("http://www.opengis.net/ont/geosparql#wktLiteral"), "Point(-77.03524 38.889468)")) //
-     *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple. (not null)
-     * @return A Triple as a {@link RyaStatement}.
-     */
-    protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final RyaType object) {
-        checkNotNull(subject);
-        checkNotNull(predicate);
-        checkNotNull(object);
-
-        final RyaStatementBuilder builder = RyaStatement.builder()//
-                        .setSubject(new RyaURI(subject))//
-                        .setPredicate(new RyaURI(predicate))//
-                        .setObject(object);
-        return builder.build();
-    }
-
-    /**
-     * A helper function for creating a {@link RyaStatement} that represents a Triple with an integer.
-     *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple, an integer value (int).
-     * @return A Triple as a {@link RyaStatement}.
-     */
-    protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) {
-        checkNotNull(subject);
-        checkNotNull(predicate);
-
-        return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate))
-                .setObject(new RyaType(XMLSchema.INT, "" + object)).build();
-    }
-
-    /**
-     * A helper function for creating a Sesame {@link Statement} that represents
-     * a Triple..
-     *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple. (not null)
-     * @return A Triple as a {@link Statement}.
-     */
-    protected static Statement makeStatement(final String subject, final String predicate, final String object) {
-        checkNotNull(subject);
-        checkNotNull(predicate);
-        checkNotNull(object);
-
-        final RyaStatement ryaStmt = makeRyaStatement(subject, predicate, object);
-        return RyaToRdfConversions.convertStatement(ryaStmt);
-    }
-
-    /**
-     * Fetches the binding sets that are the results of a specific SPARQL query from the Fluo table.
-     *
-     * @param fluoClient- A connection to the Fluo table where the results reside. (not null)
-     * @param sparql - This query's results will be fetched. (not null)
-     * @return The binding sets for the query's results.
-     */
-    protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) {
-        final Set<BindingSet> bindingSets = new HashSet<>();
-
-        try (Snapshot snapshot = fluoClient.newSnapshot()) {
-            final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString();
-
-            // Fetch the query's variable order.
-            final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
-            final VariableOrder varOrder = queryMetadata.getVariableOrder();
-
-            CellScanner cellScanner = snapshot.scanner().fetch(FluoQueryColumns.QUERY_BINDING_SET).build();
-            final BindingSetStringConverter converter = new BindingSetStringConverter();
-
-           Iterator<RowColumnValue> iter = cellScanner.iterator();
-            
-            while (iter.hasNext()) {
-            	final String bindingSetString = iter.next().getsValue();
-                final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
-                bindingSets.add(bindingSet);
-            }
-        }
-
-        return bindingSets;
-    }
-
-    private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
-    	final File miniDataDir = Files.createTempDir();
-
-    	// Setup and start the Mini Accumulo.
-    	final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD);
-    	cluster = new MiniAccumuloCluster(cfg);
-    	cluster.start();
-
-    	// Store a connector to the Mini Accumulo.
-    	instanceName = cluster.getInstanceName();
-    	zookeepers = cluster.getZooKeepers();
-
-    	final Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
-    	accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD));
-    }
-
-     /**
-      * Sets up a Rya instance.
-      */
-    protected static RyaSailRepository setupRya(final String instanceName, final String zookeepers) throws Exception {
-        checkNotNull(instanceName);
-        checkNotNull(zookeepers);
-
-        // Install the Rya instance to the mini accumulo cluster.
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(
-                ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(),
-                instanceName,
-                zookeepers), accumuloConn);
-
-        ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
-                .setEnableTableHashPrefix(false)
-                .setEnableFreeTextIndex(true)
-                .setEnableEntityCentricIndex(true)
-                .setEnableGeoIndex(true)
-                .setEnableTemporalIndex(true)
-                .setEnablePcjIndex(true)
-                .setFluoPcjAppName(FLUO_APP_NAME)
-                .build());
-
-        // Connect to the Rya instance that was just installed.
-        final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
-        final Sail sail = RyaSailFactory.getInstance(conf);
-        final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
-        return ryaRepo;
-    }
-
-    protected static AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
-        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(RYA_INSTANCE_NAME);
-        // Accumulo connection information.
-        conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName);
-        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
-        // PCJ configuration information.
-        conf.set(ConfigUtils.USE_PCJ, "true");
-        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
-        conf.set(ConfigUtils.FLUO_APP_NAME, FLUO_APP_NAME);
-        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
-                PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
-        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
-                PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
-
-        return conf;
-    }
-
-    protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
-        // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverSpecification> observers = new ArrayList<>();
-        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
-        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
-        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
-
-        // Set export details for exporting from Fluo to a Rya repository and a subscriber queue.
-        final HashMap<String, String> exportParams = new HashMap<>();
-        setExportParameters(exportParams);
-        
-        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
-        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
-        observers.add(exportObserverConfig);
-
-        // Configure how the mini fluo will run.
-        final FluoConfiguration config = new FluoConfiguration();
-        config.setMiniStartAccumulo(false);
-        config.setAccumuloInstance(instanceName);
-        config.setAccumuloUser(ACCUMULO_USER);
-        config.setAccumuloPassword(ACCUMULO_PASSWORD);
-        config.setInstanceZookeepers(zookeepers + "/fluo");
-        config.setAccumuloZookeepers(zookeepers);
-
-        config.setApplicationName(FLUO_APP_NAME);
-        config.setAccumuloTable("fluo" + FLUO_APP_NAME);
-
-        config.addObservers(observers);
-
-        FluoFactory.newAdmin(config).initialize(new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
-        return FluoFactory.newMiniFluo(config);
-    }
-
-    /**
-     * Set export details for exporting from Fluo to a Rya repository and a subscriber queue.
-     * Override this if you have custom export destinations.
-     * 
-     * @param exportParams
-     */
-    protected void setExportParameters(final HashMap<String, String> exportParams) {
-        final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
-        ryaParams.setExportToRya(true);
-        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
-        ryaParams.setAccumuloInstanceName(instanceName);
-        ryaParams.setZookeeperServers(zookeepers);
-        ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
-        ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
new file mode 100644
index 0000000..cd84cb9
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.sail.Sail;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a Kakfa topic.
+ */
+public class KafkaExportITBase extends AccumuloExportITBase {
+
+    protected static final String RYA_INSTANCE_NAME = "test_";
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private ZkUtils zkUtils;
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkClient zkClient;
+
+    // The Rya instance statements are written to that will be fed into the Fluo app.
+    private RyaSailRepository ryaSailRepo = null;
+
+    /**
+     * Add info about the Kafka queue/topic to receive the export.
+     *
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
+     */
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+
+        final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(true);
+
+        // Configure the Kafka Producer
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        kafkaParams.addAllProducerConfig(producerConfig);
+
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+        observers.add(exportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+    /**
+     * setup mini kafka and call the super to setup mini fluo
+     *
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
+     */
+    @Before
+    public void setupKafka() throws Exception {
+        // Install an instance of Rya on the Accumulo cluster.
+        installRyaInstance();
+
+        // Setup Kafka.
+        zkServer = new EmbeddedZookeeper();
+        final String zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+        final KafkaConfig config = new KafkaConfig(brokerProps);
+        final Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+    }
+
+    @After
+    public void teardownRya() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Uninstall the instance of Rya.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(
+                    ACCUMULO_USER,
+                    ACCUMULO_PASSWORD.toCharArray(),
+                    instanceName,
+                    zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+
+        // Shutdown the repo.
+        ryaSailRepo.shutDown();
+    }
+
+    private void installRyaInstance() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Install the Rya instance to the mini accumulo cluster.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(
+                    ACCUMULO_USER,
+                    ACCUMULO_PASSWORD.toCharArray(),
+                    instanceName,
+                    zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+                .setEnableTableHashPrefix(false)
+                .setEnableFreeTextIndex(false)
+                .setEnableEntityCentricIndex(false)
+                .setEnableGeoIndex(false)
+                .setEnableTemporalIndex(false)
+                .setEnablePcjIndex(true)
+                .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() )
+                .build());
+
+        // Connect to the Rya instance that was just installed.
+        final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
+        final Sail sail = RyaSailFactory.getInstance(conf);
+        ryaSailRepo = new RyaSailRepository(sail);
+    }
+
+    protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(RYA_INSTANCE_NAME);
+
+        // Accumulo connection information.
+        conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
+        conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
+        conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+        conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
+        conf.setAuths("");
+
+
+        // PCJ configuration information.
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+                PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
+                PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
+        conf.setDisplayQueryPlan(true);
+
+        return conf;
+    }
+
+    /**
+     * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into.
+     */
+    protected RyaSailRepository getRyaSailRepository() throws Exception {
+        return ryaSailRepo;
+    }
+
+    /**
+     * Close all the Kafka mini server and mini-zookeeper
+     *
+     * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
+     */
+    @After
+    public void teardownKafka() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    /**
+     * Test kafka without rya code to make sure kafka works in this environment.
+     * If this test fails then its a testing environment issue, not with Rya.
+     * Source: https://github.com/asmaier/mini-kafka
+     */
+    @Test
+    public void embeddedKafkaTest() throws Exception {
+        // create topic
+        final String topic = "testTopic";
+        AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+        // setup producer
+        final Properties producerProps = new Properties();
+        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+        producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
+
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty("group.id", "group0");
+        consumerProps.setProperty("client.id", "consumer0");
+        consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put("auto.offset.reset", "earliest");
+
+        final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(topic));
+
+        // send message
+        final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8));
+        producer.send(data);
+        producer.close();
+
+        // starting consumer
+        final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+        assertEquals(1, records.count());
+        final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
+        final ConsumerRecord<Integer, byte[]> record = recordIterator.next();
+        assertEquals(42, (int) record.key());
+        assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
+        consumer.close();
+    }
+
+    protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(TopicName));
+        return consumer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
new file mode 100644
index 0000000..5fe999f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.openrdf.sail.Sail;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index.
+ */
+public class RyaExportITBase extends AccumuloExportITBase {
+
+    protected static final String RYA_INSTANCE_NAME = "test_";
+
+    private RyaSailRepository ryaSailRepo = null;
+
+    public RyaExportITBase() {
+        // Indicates that MiniFluo should be started before each test.
+        super(true);
+    }
+
+    @BeforeClass
+    public static void setupLogging() {
+        BasicConfigurator.configure();
+        Logger.getRootLogger().setLevel(Level.ERROR);
+    }
+
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+        final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
+        ryaParams.setExportToRya(true);
+        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
+        ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
+        ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
+        ryaParams.setExporterUsername(ACCUMULO_USER);
+        ryaParams.setExporterPassword(ACCUMULO_PASSWORD);
+
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+        observers.add(exportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+    @Before
+    public void setupRya() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Install the Rya instance to the mini accumulo cluster.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(
+                    ACCUMULO_USER,
+                    ACCUMULO_PASSWORD.toCharArray(),
+                    instanceName,
+                    zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+                .setEnableTableHashPrefix(false)
+                .setEnableFreeTextIndex(false)
+                .setEnableEntityCentricIndex(false)
+                .setEnableGeoIndex(false)
+                .setEnableTemporalIndex(false)
+                .setEnablePcjIndex(true)
+                .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() )
+                .build());
+
+        // Connect to the Rya instance that was just installed.
+        final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
+        final Sail sail = RyaSailFactory.getInstance(conf);
+        ryaSailRepo = new RyaSailRepository(sail);
+    }
+
+    @After
+    public void teardownRya() throws Exception {
+        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+        final String instanceName = cluster.getInstanceName();
+        final String zookeepers = cluster.getZooKeepers();
+
+        // Uninstall the instance of Rya.
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+                new AccumuloConnectionDetails(
+                    ACCUMULO_USER,
+                    ACCUMULO_PASSWORD.toCharArray(),
+                    instanceName,
+                    zookeepers),
+                super.getAccumuloConnector());
+
+        ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+
+        // Shutdown the repo.
+        ryaSailRepo.shutDown();
+    }
+
+    /**
+     * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into.
+     */
+    protected RyaSailRepository getRyaSailRepository() throws Exception {
+        return ryaSailRepo;
+    }
+
+    protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(RYA_INSTANCE_NAME);
+
+        // Accumulo connection information.
+        conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
+        conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
+        conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+        conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
+        conf.setAuths("");
+
+        // PCJ configuration information.
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+                PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
+                PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
+        conf.setDisplayQueryPlan(true);
+
+        return conf;
+    }
+}
\ No newline at end of file