You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/07/21 13:15:36 UTC

incubator-rya git commit: RYA-105: Fluo Integration

Repository: incubator-rya
Updated Branches:
  refs/heads/develop dbd46e7a7 -> d9aaca79c


RYA-105: Fluo Integration


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/d9aaca79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/d9aaca79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/d9aaca79

Branch: refs/heads/develop
Commit: d9aaca79cc1c69ca049c794f8b35e72e877e808e
Parents: dbd46e7
Author: Caleb Meier <me...@gmail.com>
Authored: Wed Jul 6 12:26:38 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Thu Jul 21 09:09:39 2016 -0400

----------------------------------------------------------------------
 .../java/mvm/rya/accumulo/AccumuloRyaDAO.java   |  62 ++--
 .../mvm/rya/indexing/accumulo/ConfigUtils.java  |  48 ++-
 .../external/PrecomputedJoinIndexer.java        | 130 ++++---
 .../external/PrecomputedJoinIndexerConfig.java  |  15 +-
 .../external/fluo/FluoPcjUpdaterConfig.java     |   5 +-
 .../external/fluo/FluoPcjUpdaterSupplier.java   |  17 +-
 .../rya/indexing/external/fluo/NoOpUpdater.java |  53 +++
 .../external/fluo/NoOpUpdaterSupplier.java      |  39 +++
 .../fluo/PcjUpdaterSupplierFactory.java         |  57 +++
 .../rya.pcj.fluo/pcj.fluo.integration/pom.xml   |  47 ++-
 .../apache/rya/indexing/pcj/fluo/ITBase.java    | 345 +++++++++++--------
 .../pcj/fluo/api/CountStatementsIT.java         |  23 +-
 .../pcj/fluo/integration/RyaExportIT.java       |  16 +-
 .../RyaInputIncrementalUpdateIT.java            | 200 +++++++++++
 .../pcj/fluo/visibility/PcjVisibilityIT.java    |  18 +-
 15 files changed, 776 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
index b10c522..ed991e1 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
@@ -8,9 +8,9 @@ package mvm.rya.accumulo;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,6 +31,7 @@ import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS;
 import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA;
 import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA;
 import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA;
+import info.aduna.iteration.CloseableIteration;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -40,6 +41,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.resolver.RyaTripleContext;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchDeleter;
@@ -64,21 +77,6 @@ import org.openrdf.model.Namespace;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
-import info.aduna.iteration.CloseableIteration;
-import mvm.rya.accumulo.experimental.AccumuloIndexer;
-import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.layout.TableLayoutStrategy;
-import mvm.rya.api.persist.RyaDAO;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.persist.RyaNamespaceManager;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
 public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
     private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
 
@@ -111,8 +109,9 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
     @Override
     public void init() throws RyaDAOException {
-        if (initialized)
+        if (initialized) {
             return;
+        }
         try {
             checkNotNull(conf);
             checkNotNull(connector);
@@ -131,7 +130,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             secondaryIndexers = conf.getAdditionalIndexers();
 
             flushEachUpdate = conf.flushEachUpdate();
-            
+
             TableOperations tableOperations = connector.tableOperations();
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
@@ -150,7 +149,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
 
             bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
-            
+
             for (AccumuloIndexer index : secondaryIndexers) {
                index.setConnector(connector);
                index.setMultiTableBatchWriter(mt_bw);
@@ -248,9 +247,15 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         } catch (Exception e) {
             throw new RyaDAOException(e);
         } finally {
-            if (bd_spo != null) bd_spo.close();
-            if (bd_po != null) bd_po.close();
-            if (bd_osp != null) bd_osp.close();
+            if (bd_spo != null) {
+                bd_spo.close();
+            }
+            if (bd_po != null) {
+                bd_po.close();
+            }
+            if (bd_osp != null) {
+                bd_osp.close();
+            }
         }
 
     }
@@ -520,10 +525,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
     }
 
-    private void checkVersion() throws RyaDAOException {
+    private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
         String version = getVersion();
         if (version == null) {
-            this.add(getVersionRyaStatement());
+            //adding to core Rya tables but not Indexes
+            Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+            Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+            Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+            Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+            bw_spo.addMutations(spo);
+            bw_po.addMutations(po);
+            bw_osp.addMutations(osp);
         }
         //TODO: Do a version check here
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
index 28afce7..c15ba1b 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
@@ -25,6 +25,21 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.FilterFunctionOptimizer;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import mvm.rya.indexing.mongodb.geo.MongoGeoIndexer;
+import mvm.rya.indexing.pcj.matching.PCJOptimizer;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -47,22 +62,9 @@ import org.apache.log4j.Logger;
 import org.openrdf.model.URI;
 import org.openrdf.model.impl.URIImpl;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.FilterFunctionOptimizer;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
-import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import mvm.rya.indexing.accumulo.freetext.Tokenizer;
-import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
-import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import mvm.rya.indexing.mongodb.geo.MongoGeoIndexer;
-import mvm.rya.indexing.pcj.matching.PCJOptimizer;
-
 /**
  * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
  */
@@ -96,6 +98,12 @@ public class ConfigUtils {
     public static final String USE_PCJ = "sc.use_pcj";
     public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
 
+    public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+    public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
+    public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
+    public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+
+
     public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail";
     public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail";
 
@@ -366,6 +374,17 @@ public class ConfigUtils {
         return conf.getBoolean(USE_OPTIMAL_PCJ, false);
     }
 
+
+    /**
+     * @return The name of the Fluo Application this instance of RYA is
+     *   using to incrementally update PCJs.
+     */
+    //TODO delete this eventually and use Details table
+    public Optional<String> getFluoAppName(Configuration conf) {
+        return Optional.fromNullable(conf.get(FLUO_APP_NAME));
+    }
+
+
     public static boolean getUseMongo(final Configuration conf) {
         return conf.getBoolean(USE_MONGO, false);
     }
@@ -391,6 +410,7 @@ public class ConfigUtils {
 
             if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
                 conf.setPcjOptimizer(PCJOptimizer.class);
+                indexList.add(PrecomputedJoinIndexer.class.getName());
             }
 
             if (getUseGeo(conf)) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
index 6aaf2c4..0324a79 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
@@ -28,6 +28,15 @@ import java.util.Set;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
+import mvm.rya.indexing.external.fluo.PcjUpdaterSupplierFactory;
+
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -41,21 +50,13 @@ import org.openrdf.model.URI;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 
-import mvm.rya.accumulo.experimental.AccumuloIndexer;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.persist.RyaDAO;
-import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage;
-import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
-import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier;
-
 /**
  * Updates the state of the Precomputed Join indices that are used by Rya.
  */
 @ParametersAreNonnullByDefault
-public class PrecomputedJoinIndexer implements AccumuloIndexer {
-    private static final Logger log = Logger.getLogger(PrecomputedJoinIndexer.class);
+public class PrecomputedJoinIndexer extends AbstractAccumuloIndexer {
+    private static final Logger log = Logger
+            .getLogger(PrecomputedJoinIndexer.class);
 
     /**
      * This configuration object must be set before {@link #init()} is invoked.
@@ -64,14 +65,14 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
     private Optional<Configuration> conf = Optional.absent();
 
     /**
-     * The Accumulo Connector that must be used when accessing an Accumulo storage.
-     * This value is provided by {@link #setConnector(Connector)}.
+     * The Accumulo Connector that must be used when accessing an Accumulo
+     * storage. This value is provided by {@link #setConnector(Connector)}.
      */
     private Optional<Connector> accumuloConn = Optional.absent();
 
     /**
-     * Provides access to the {@link Configuration} that was provided to this class
-     * using {@link #setConf(Configuration)}.
+     * Provides access to the {@link Configuration} that was provided to this
+     * class using {@link #setConf(Configuration)}.
      */
     private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() {
         @Override
@@ -92,23 +93,22 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
     };
 
     /**
-     * Creates and grants access to the {@link PrecomputedJoinStorage} that will be used
-     * to interact with the PCJ results that are stored and used by Rya.
+     * Creates and grants access to the {@link PrecomputedJoinStorage} that will
+     * be used to interact with the PCJ results that are stored and used by Rya.
      */
-    private final PrecomputedJoinStorageSupplier pcjStorageSupplier =
-            new PrecomputedJoinStorageSupplier(
-                    configSupplier,
-                    new AccumuloPcjStorageSupplier(configSupplier, accumuloSupplier));
+    private final PrecomputedJoinStorageSupplier pcjStorageSupplier = new PrecomputedJoinStorageSupplier(
+            configSupplier, new AccumuloPcjStorageSupplier(configSupplier,
+                    accumuloSupplier));
+
+    private PrecomputedJoinStorage pcjStorage;
 
     /**
-     * Creates and grants access to the {@link PrecomputedJoinUpdater} that will
+     * Creates and grants access to the {@link PrecomputedJoinUpdater}s that will
      * be used to update the state stored within the PCJ tables that are stored
      * in Accumulo.
      */
-    private final PrecomputedJoinUpdaterSupplier pcjUpdaterSupplier =
-            new PrecomputedJoinUpdaterSupplier(
-                    configSupplier,
-                    new FluoPcjUpdaterSupplier(configSupplier));
+    private Supplier<PrecomputedJoinUpdater> updaterSupplier;
+
 
     @Override
     public void setConf(final Configuration conf) {
@@ -127,7 +127,7 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
     @Override
     public void setConnector(final Connector connector) {
         checkNotNull(connector);
-        accumuloConn = Optional.of( connector );
+        accumuloConn = Optional.of(connector);
     }
 
     /**
@@ -135,40 +135,48 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
      */
     @Override
     public void init() {
-        pcjStorageSupplier.get();
-        pcjUpdaterSupplier.get();
+        pcjStorage = pcjStorageSupplier.get();
+        updaterSupplier = new PcjUpdaterSupplierFactory(configSupplier).getSupplier();
+        updaterSupplier.get();
     }
 
     @Override
     public void storeStatement(final RyaStatement statement) throws IOException {
         checkNotNull(statement);
-        storeStatements( Collections.singleton(statement) );
+        storeStatements(Collections.singleton(statement));
     }
 
     @Override
-    public void storeStatements(final Collection<RyaStatement> statements) throws IOException {
+    public void storeStatements(final Collection<RyaStatement> statements)
+            throws IOException {
         checkNotNull(statements);
         try {
-            pcjUpdaterSupplier.get().addStatements(statements);
+          updaterSupplier.get().addStatements(statements);
         } catch (final PcjUpdateException e) {
-            throw new IOException("Could not update the PCJs by adding the provided statements.", e);
+            throw new IOException(
+                    "Could not update the PCJs by adding the provided statements.",
+                    e);
         }
     }
 
     @Override
-    public void deleteStatement(final RyaStatement statement) throws IOException {
+    public void deleteStatement(final RyaStatement statement)
+            throws IOException {
         checkNotNull(statement);
         try {
-            pcjUpdaterSupplier.get().deleteStatements( Collections.singleton(statement) );
+            Collection<RyaStatement> statements = Collections.singleton(statement);
+            updaterSupplier.get().deleteStatements(statements);
         } catch (final PcjUpdateException e) {
-            throw new IOException("Could not update the PCJs by removing the provided statement.", e);
+            throw new IOException(
+                    "Could not update the PCJs by removing the provided statement.",
+                    e);
         }
     }
 
     @Override
     public void flush() throws IOException {
         try {
-            pcjUpdaterSupplier.get().flush();
+            updaterSupplier.get().flush();
         } catch (final PcjUpdateException e) {
             throw new IOException("Could not flush the PCJ Updater.", e);
         }
@@ -177,13 +185,13 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
     @Override
     public void close() {
         try {
-            pcjStorageSupplier.get().close();
+            pcjStorage.close();
         } catch (final PCJStorageException e) {
             log.error("Could not close the PCJ Storage instance.", e);
         }
 
         try {
-            pcjUpdaterSupplier.get().close();
+            updaterSupplier.get().close();
         } catch (final PcjUpdateException e) {
             log.error("Could not close the PCJ Updater instance.", e);
         }
@@ -198,48 +206,56 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
     }
 
     /**
-     * Deletes all data from the PCJ indices that are managed by a {@link PrecomputedJoinStorage}.
+     * Deletes all data from the PCJ indices that are managed by a
+     * {@link PrecomputedJoinStorage}.
      */
     @Override
     public void purge(final RdfCloudTripleStoreConfiguration configuration) {
-        final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
 
         try {
-            for(final String pcjId : storage.listPcjs()) {
+            for (final String pcjId : pcjStorage.listPcjs()) {
                 try {
-                    storage.purge(pcjId);
-                } catch(final PCJStorageException e) {
-                    log.error("Could not purge the PCJ index with id: " + pcjId, e);
+                    pcjStorage.purge(pcjId);
+                } catch (final PCJStorageException e) {
+                    log.error(
+                            "Could not purge the PCJ index with id: " + pcjId,
+                            e);
                 }
             }
         } catch (final PCJStorageException e) {
-            log.error("Could not purge the PCJ indicies because they could not be listed.", e);
+            log.error(
+                    "Could not purge the PCJ indicies because they could not be listed.",
+                    e);
         }
     }
 
     /**
-     * Deletes all of the PCJ indices that are managed by {@link PrecomputedJoinStorage}.
+     * Deletes all of the PCJ indices that are managed by
+     * {@link PrecomputedJoinStorage}.
      */
     @Override
     public void dropAndDestroy() {
-        final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
-
         try {
-            for(final String pcjId : storage.listPcjs()) {
+            for (final String pcjId : pcjStorage.listPcjs()) {
                 try {
-                    storage.dropPcj(pcjId);
-                } catch(final PCJStorageException e) {
-                    log.error("Could not delete the PCJ index with id: " + pcjId, e);
+                    pcjStorage.dropPcj(pcjId);
+                } catch (final PCJStorageException e) {
+                    log.error("Could not delete the PCJ index with id: "
+                            + pcjId, e);
                 }
             }
-        } catch(final PCJStorageException e) {
-            log.error("Could not delete the PCJ indicies because they could not be listed.", e);
+        } catch (final PCJStorageException e) {
+            log.error(
+                    "Could not delete the PCJ indicies because they could not be listed.",
+                    e);
         }
     }
 
     @Override
-    public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException {
-        // We do not need to use the writer that also writes to the core RYA tables.
+    public void setMultiTableBatchWriter(final MultiTableBatchWriter writer)
+            throws IOException {
+        // We do not need to use the writer that also writes to the core RYA
+        // tables.
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
index c56f574..3c76601 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
@@ -22,14 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 
 import com.google.common.base.Optional;
 
-import mvm.rya.api.persist.index.RyaSecondaryIndexer;
-
 /**
  * Inspects the {@link Configuration} object that is provided to all instances
  * of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer}
@@ -55,7 +56,7 @@ public class PrecomputedJoinIndexerConfig {
         /**
          * Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered.
          */
-        FLUO;
+        FLUO, NO_UPDATE;
     }
 
     // Indicates which implementation of PrecomputedJoinStorage to use.
@@ -63,6 +64,7 @@ public class PrecomputedJoinIndexerConfig {
 
     // Indicates which implementation of PrecomputedJoinUpdater to use.
     public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+    public static final String USE_PCJ_FLUO_UPDATER = ConfigUtils.USE_PCJ_FLUO_UPDATER;
 
     // The configuration object that is provided to Secondary Indexing implementations.
     private final Configuration config;
@@ -104,6 +106,13 @@ public class PrecomputedJoinIndexerConfig {
         return Optional.fromNullable(updaterType);
     }
 
+
+
+    public boolean getUseFluoUpdater() {
+    	return config.getBoolean(USE_PCJ_FLUO_UPDATER, false);
+    }
+
+
     /**
      * @return The configuration object that has been wrapped.
      */

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
index 4378a4a..2a34a82 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
@@ -19,20 +19,19 @@
 package mvm.rya.indexing.external.fluo;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import mvm.rya.indexing.accumulo.ConfigUtils;
 
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Optional;
 
-import mvm.rya.indexing.accumulo.ConfigUtils;
-
 /**
  * Configuration values required to initialize a {@link FluoPcjUpdater}.
  */
 public final class FluoPcjUpdaterConfig {
 
     // Defines which Fluo application is running for this instance of Rya.
-    public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+    public static final String FLUO_APP_NAME = ConfigUtils.FLUO_APP_NAME;
 
     // Values that define which Accumulo instance hosts the Fluo application's table.
     public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
index ed202f4..61de078 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
@@ -26,25 +26,26 @@ import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERN
 import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS;
 import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME;
 import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.config.FluoConfiguration;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
-
 /**
  * Creates instances of {@link FluoPcjUpdater} using the values found in a {@link Configuration}.
  */
 @ParametersAreNonnullByDefault
-public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> {
+public class FluoPcjUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> {
 
     private final Supplier<Configuration> configSupplier;
 
@@ -66,7 +67,7 @@ public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> {
         final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
 
         final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType();
-        checkArgument(updaterType.isPresent() && (updaterType.get() == PrecomputedJoinUpdaterType.FLUO),
+        checkArgument(updaterType.isPresent() && updaterType.get() == PrecomputedJoinUpdaterType.FLUO,
                 "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE +
                 "' value be set to '" + PrecomputedJoinUpdaterType.FLUO + "'.");
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java
new file mode 100644
index 0000000..66a6b24
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import java.util.Collection;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+/**
+ * A NoOp updater (which does nothing) to be used by {@link PrecomputedJoinIndexer} if neither Batch nor
+ * {@link FluoPcjUpdater} is specified by the user to update Precomputed Joins.
+ *
+ */
+public class NoOpUpdater implements PrecomputedJoinUpdater {
+
+    @Override
+    public void addStatements(Collection<RyaStatement> statements)
+            throws PcjUpdateException {
+    }
+
+    @Override
+    public void deleteStatements(Collection<RyaStatement> statements)
+            throws PcjUpdateException {
+    }
+
+    @Override
+    public void flush() throws PcjUpdateException {
+    }
+
+    @Override
+    public void close() throws PcjUpdateException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java
new file mode 100644
index 0000000..6831353
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Supplier;
+
+/**
+ * A {@link Supplier} for {@link NoOpUdater}s.  This Supplier is used by
+ * {@link PrecomputedJoinIndexer} when no update strategy is specified by the user.
+ *
+ */
+public class NoOpUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> {
+
+    @Override
+    public NoOpUpdater get() {
+        return new NoOpUpdater();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java
new file mode 100644
index 0000000..0250b6d
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * A factory for {@link Supplier}s used by {@link PrecomputedJoinIndexer} to
+ * get all update strategies for precomputed joins for a given Rya instance.
+ *
+ */
+public class PcjUpdaterSupplierFactory {
+
+    private Supplier<Configuration> configSupplier;
+
+    public PcjUpdaterSupplierFactory(Supplier<Configuration> configSupplier) {
+        this.configSupplier = configSupplier;
+    }
+
+    public Supplier<PrecomputedJoinUpdater> getSupplier() {
+
+        PrecomputedJoinIndexerConfig config = new PrecomputedJoinIndexerConfig(configSupplier.get());
+        //TODO this should not be read from the config.  Instead,
+        //this information should be retrieved from the RyaDetails table
+        if(config.getUseFluoUpdater()) {
+            return Suppliers.memoize(new FluoPcjUpdaterSupplier(configSupplier));
+        }
+        else {
+            return Suppliers.memoize(new NoOpUpdaterSupplier());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 6ef282f..6ca8cd7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,25 +1,15 @@
 <?xml version="1.0" encoding="utf-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project 
-    xmlns="http://maven.apache.org/POM/4.0.0" 
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    you under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <parent>
@@ -27,13 +17,13 @@ under the License.
         <artifactId>rya.pcj.fluo.parent</artifactId>
         <version>3.2.10-SNAPSHOT</version>
     </parent>
-    
+
     <modelVersion>4.0.0</modelVersion>
     <artifactId>rya.pcj.fluo.integration</artifactId>
-    
+
     <name>Apache Rya PCJ Fluo Integration Tests</name>
     <description>Integration tests for the Rya Fluo application.</description>
-    
+
     <dependencies>
         <!-- Rya Runtime Dependencies. -->
         <dependency>
@@ -48,7 +38,10 @@ under the License.
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.pcj.fluo.client</artifactId>
         </dependency>
-    
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
         <!-- Testing dependencies. -->
         <dependency>
             <groupId>io.fluo</groupId>
@@ -60,5 +53,9 @@ under the License.
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+         <dependency>
+            <groupId>io.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 618cab9..154156f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -20,8 +20,23 @@ package org.apache.rya.indexing.pcj.fluo;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import io.fluo.api.client.FluoAdmin;
+import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import io.fluo.api.client.FluoAdmin.TableExistsException;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.FluoConfiguration;
+import io.fluo.api.config.ObserverConfiguration;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.mini.MiniFluo;
+
 import java.io.File;
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +44,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.UUID;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+import mvm.rya.sail.config.RyaSailFactory;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -37,6 +66,7 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
@@ -57,112 +87,91 @@ import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
 import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
 
 import com.google.common.io.Files;
 
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-import io.fluo.api.mini.MiniFluo;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaToRdfConversions;
-import mvm.rya.rdftriplestore.RdfCloudTripleStore;
-import mvm.rya.rdftriplestore.RyaSailRepository;
-
 /**
- * Integration tests that ensure the Fluo application processes PCJs results correctly.
+ * Integration tests that ensure the Fluo application processes PCJs results
+ * correctly.
  * <p>
  * This class is being ignored because it doesn't contain any unit tests.
  */
 public abstract class ITBase {
     private static final Logger log = Logger.getLogger(ITBase.class);
 
-    public static final String USE_MOCK_INSTANCE = ".useMockInstance";
-    public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
-    public static final String CLOUDBASE_USER = "sc.cloudbase.username";
-    public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
-
     protected static final String RYA_TABLE_PREFIX = "demo_";
+    
+    protected static final String ACCUMULO_USER = "root";
+    protected static final String ACCUMULO_PASSWORD = "password";
 
     // Rya data store and connections.
-    protected MiniAccumuloCluster accumulo = null;
-    protected static Connector accumuloConn = null;
     protected RyaSailRepository ryaRepo = null;
     protected RepositoryConnection ryaConn = null;
+    
 
+    // Mini Accumulo Cluster
+    protected MiniAccumuloCluster cluster;
+    protected static Connector accumuloConn = null;
+    protected String instanceName = null;
+    protected String zookeepers = null;
+    
     // Fluo data store and connections.
     protected MiniFluo fluo = null;
     protected FluoClient fluoClient = null;
+    protected final String appName = "IntegrationTests";
 
     @Before
-    public void setupMiniResources() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException {
-        // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it.
-        accumulo = startMiniAccumulo();
-
-        // Setup the Rya library to use the Mini Accumulo.
-        ryaRepo = setupRya(accumulo);
-        ryaConn = ryaRepo.getConnection();
-
+    public void setupMiniResources()
+            throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException,
+            RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, TableExistsException {
+    	// Initialize the Mini Accumulo that will be used to host Rya and Fluo.
+    	setupMiniAccumulo();
+    	
         // Initialize the Mini Fluo that will be used to store created queries.
         fluo = startMiniFluo();
-        fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() );
-    }
+        fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
 
+        // Initialize the Rya that will be used by the tests.
+        ryaRepo = setupRya(ACCUMULO_USER, ACCUMULO_PASSWORD, instanceName, zookeepers, appName);
+        ryaConn = ryaRepo.getConnection();
+    }
+    
     @After
     public void shutdownMiniResources() {
-        if(ryaConn != null) {
+    	// TODO shutdown the cluster
+    	
+        if (ryaConn != null) {
             try {
                 log.info("Shutting down Rya Connection.");
                 ryaConn.close();
                 log.info("Rya Connection shut down.");
-            } catch(final Exception e) {
+            } catch (final Exception e) {
                 log.error("Could not shut down the Rya Connection.", e);
             }
         }
 
-        if(ryaRepo != null) {
+        if (ryaRepo != null) {
             try {
                 log.info("Shutting down Rya Repo.");
                 ryaRepo.shutDown();
                 log.info("Rya Repo shut down.");
-            } catch(final Exception e) {
+            } catch (final Exception e) {
                 log.error("Could not shut down the Rya Repo.", e);
             }
         }
-
-        if(accumulo != null) {
-            try {
-                log.info("Shutting down the Mini Accumulo being used as a Rya store.");
-                accumulo.stop();
-                log.info("Mini Accumulo being used as a Rya store shut down.");
-            } catch(final Exception e) {
-                log.error("Could not shut down the Mini Accumulo.", e);
-            }
-        }
-
-        if(fluoClient != null) {
+        
+        if (fluoClient != null) {
             try {
                 log.info("Shutting down Fluo Client.");
                 fluoClient.close();
                 log.info("Fluo Client shut down.");
-            } catch(final Exception e) {
+            } catch (final Exception e) {
                 log.error("Could not shut down the Fluo Client.", e);
             }
         }
 
-        if(fluo != null) {
+        if (fluo != null) {
             try {
                 log.info("Shutting down Mini Fluo.");
                 fluo.close();
@@ -171,28 +180,44 @@ public abstract class ITBase {
                 log.error("Could not shut down the Mini Fluo.", e);
             }
         }
+        
+        if(cluster != null) {
+            try {
+                log.info("Shutting down the Mini Accumulo being used as a Rya store.");
+                cluster.stop();
+                log.info("Mini Accumulo being used as a Rya store shut down.");
+            } catch(final Exception e) {
+                log.error("Could not shut down the Mini Accumulo.", e);
+            }
+        }
     }
 
     /**
-     * A helper fuction for creating a {@link BindingSet} from an array of {@link Binding}s.
+     * A helper fuction for creating a {@link BindingSet} from an array of
+     * {@link Binding}s.
      *
-     * @param bindings - The bindings to include in the set. (not null)
+     * @param bindings
+     *            - The bindings to include in the set. (not null)
      * @return A {@link BindingSet} holding the bindings.
      */
     protected static BindingSet makeBindingSet(final Binding... bindings) {
         final MapBindingSet bindingSet = new MapBindingSet();
-        for(final Binding binding : bindings) {
+        for (final Binding binding : bindings) {
             bindingSet.addBinding(binding);
         }
         return bindingSet;
     }
 
     /**
-     * A helper function for creating a {@link RyaStatement} that represents a Triple.
+     * A helper function for creating a {@link RyaStatement} that represents a
+     * Triple.
      *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple. (not null)
+     * @param subject
+     *            - The Subject of the Triple. (not null)
+     * @param predicate
+     *            - The Predicate of the Triple. (not null)
+     * @param object
+     *            - The Object of the Triple. (not null)
      * @return A Triple as a {@link RyaStatement}.
      */
     protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) {
@@ -200,44 +225,48 @@ public abstract class ITBase {
         checkNotNull(predicate);
         checkNotNull(object);
 
-        final RyaStatementBuilder builder = RyaStatement.builder()
-            .setSubject( new RyaURI(subject) )
-            .setPredicate( new RyaURI(predicate) );
+        final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject))
+                .setPredicate(new RyaURI(predicate));
 
-        if(object.startsWith("http://")) {
-            builder.setObject(new RyaURI(object) );
+        if (object.startsWith("http://")) {
+            builder.setObject(new RyaURI(object));
         } else {
-            builder.setObject( new RyaType(object) );
+            builder.setObject(new RyaType(object));
         }
 
         return builder.build();
     }
 
     /**
-     * A helper function for creating a {@link RyaStatement} that represents a Triple.
+     * A helper function for creating a {@link RyaStatement} that represents a
+     * Triple.
      *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple. (not null)
+     * @param subject
+     *            - The Subject of the Triple. (not null)
+     * @param predicate
+     *            - The Predicate of the Triple. (not null)
+     * @param object
+     *            - The Object of the Triple. (not null)
      * @return A Triple as a {@link RyaStatement}.
      */
     protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) {
         checkNotNull(subject);
         checkNotNull(predicate);
 
-        return RyaStatement.builder()
-                .setSubject(new RyaURI(subject))
-                .setPredicate(new RyaURI(predicate))
-                .setObject( new RyaType(XMLSchema.INT, "" + object) )
-                .build();
+        return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate))
+                .setObject(new RyaType(XMLSchema.INT, "" + object)).build();
     }
 
     /**
-     * A helper function for creating a Sesame {@link Statement} that represents a Triple..
+     * A helper function for creating a Sesame {@link Statement} that represents
+     * a Triple..
      *
-     * @param subject - The Subject of the Triple. (not null)
-     * @param predicate - The Predicate of the Triple. (not null)
-     * @param object - The Object of the Triple. (not null)
+     * @param subject
+     *            - The Subject of the Triple. (not null)
+     * @param predicate
+     *            - The Predicate of the Triple. (not null)
+     * @param object
+     *            - The Object of the Triple. (not null)
      * @return A Triple as a {@link Statement}.
      */
     protected static Statement makeStatement(final String subject, final String predicate, final String object) {
@@ -253,14 +282,17 @@ public abstract class ITBase {
      * Fetches the binding sets that are the results of a specific SPARQL query
      * from the Fluo table.
      *
-     * @param fluoClient- A connection to the Fluo table where the results reside. (not null)
-     * @param sparql - This query's results will be fetched. (not null)
+     * @param fluoClient-
+     *            A connection to the Fluo table where the results reside. (not
+     *            null)
+     * @param sparql
+     *            - This query's results will be fetched. (not null)
      * @return The binding sets for the query's results.
      */
     protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) {
         final Set<BindingSet> bindingSets = new HashSet<>();
 
-        try(Snapshot snapshot = fluoClient.newSnapshot()) {
+        try (Snapshot snapshot = fluoClient.newSnapshot()) {
             final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString();
 
             // Fetch the query's variable order.
@@ -269,12 +301,13 @@ public abstract class ITBase {
 
             // Fetch the Binding Sets for the query.
             final ScannerConfiguration scanConfig = new ScannerConfiguration();
-            scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
+            scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(),
+                    FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
 
             BindingSetStringConverter converter = new BindingSetStringConverter();
 
             final RowIterator rowIter = snapshot.get(scanConfig);
-            while(rowIter.hasNext()) {
+            while (rowIter.hasNext()) {
                 final Entry<Bytes, ColumnIterator> row = rowIter.next();
                 final String bindingSetString = row.getValue().next().getValue().toString();
                 final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
@@ -285,77 +318,94 @@ public abstract class ITBase {
         return bindingSets;
     }
 
-    /**
-     * Setup a Mini Accumulo cluster that uses a temporary directory to store its data.
-     *
-     * @return A Mini Accumulo cluster.
-     */
-    private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
-        final File miniDataDir = Files.createTempDir();
-
-        // Setup and start the Mini Accumulo.
-        final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password");
-        accumulo.start();
-
-        // Store a connector to the Mini Accumulo.
-        final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
-        accumuloConn = instance.getConnector("root", new PasswordToken("password"));
-
-        return accumulo;
+    private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
+    	File miniDataDir = Files.createTempDir();
+    	
+    	// Setup and start the Mini Accumulo.
+    	MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD);
+    	cluster = new MiniAccumuloCluster(cfg);
+    	cluster.start();
+    	
+    	// Store a connector to the Mini Accumulo.
+    	instanceName = cluster.getInstanceName();
+    	zookeepers = cluster.getZooKeepers();
+    	
+    	Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
+    	accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD));
     }
 
-    /**
-     * Format a Mini Accumulo to be a Rya repository.
-     *
-     * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null)
-     * @return The Rya repository sitting on top of the Mini Accumulo.
-     */
-    private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException {
-        checkNotNull(accumulo);
-
-        // Setup the Rya Repository that will be used to create Repository Connections.
-        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
-        final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
-        crdfdao.setConnector(accumuloConn);
+     /**
+      * Sets up a Rya instance
+      *
+      * @param user
+      * @param password
+      * @param instanceName
+      * @param zookeepers
+      * @param appName
+      * @return
+      * @throws AccumuloException
+      * @throws AccumuloSecurityException
+      * @throws RepositoryException
+      * @throws RyaDAOException
+      * @throws NumberFormatException
+      * @throws UnknownHostException
+      * @throws InferenceEngineException
+      */
+    protected static RyaSailRepository setupRya(String user, String password, String instanceName, String zookeepers, String appName)
+            throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException,
+            NumberFormatException, UnknownHostException, InferenceEngineException {
+
+        checkNotNull(user);
+        checkNotNull(password);
+        checkNotNull(instanceName);
+        checkNotNull(zookeepers);
+        checkNotNull(appName);
 
         // Setup Rya configuration values.
         final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix("demo_");
+        conf.setTablePrefix(RYA_TABLE_PREFIX);
         conf.setDisplayQueryPlan(true);
-
-        conf.setBoolean(USE_MOCK_INSTANCE, true);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX);
-        conf.set(CLOUDBASE_USER, "root");
-        conf.set(CLOUDBASE_PASSWORD, "password");
-        conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName());
-
-        crdfdao.setConf(conf);
-        ryaStore.setRyaDAO(crdfdao);
-
-        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
+        conf.set(ConfigUtils.CLOUDBASE_USER, user);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, password);
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName);
+        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers);
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.USE_PCJ_FLUO_UPDATER, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, appName);
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+                PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+        conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
+
+        Sail sail = RyaSailFactory.getInstance(conf);
+        final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
         ryaRepo.initialize();
 
         return ryaRepo;
     }
 
     /**
-     * Override this method to provide an output configuration to the Fluo application.
+     * Override this method to provide an output configuration to the Fluo
+     * application.
      * <p>
      * Returns an empty map by default.
      *
-     * @return The parameters that will be passed to {@link QueryResultObserver} at startup.
+     * @return The parameters that will be passed to {@link QueryResultObserver}
+     *         at startup.
      */
     protected Map<String, String> makeExportParams() {
         return new HashMap<>();
     }
 
     /**
-     * Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll
+     * Setup a Mini Fluo cluster that uses a temporary directory to store its
+     * data.ll
      *
      * @return A Mini Fluo cluster.
      */
-    protected MiniFluo startMiniFluo() {
-        final File miniDataDir = Files.createTempDir();
+    protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
+//        final File miniDataDir = Files.createTempDir();
 
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverConfiguration> observers = new ArrayList<>();
@@ -364,18 +414,29 @@ public abstract class ITBase {
         observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
         observers.add(new ObserverConfiguration(FilterObserver.class.getName()));
 
-        // Provide export parameters child test classes may provide to the export observer.
-        final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName());
-        exportObserverConfig.setParameters( makeExportParams() );
+        // Provide export parameters child test classes may provide to the
+        // export observer.
+        final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(
+                QueryResultObserver.class.getName());
+        exportObserverConfig.setParameters(makeExportParams());
         observers.add(exportObserverConfig);
 
         // Configure how the mini fluo will run.
         final FluoConfiguration config = new FluoConfiguration();
-        config.setApplicationName("IntegrationTests");
-        config.setMiniDataDir(miniDataDir.getAbsolutePath());
+        config.setMiniStartAccumulo(false);
+        config.setAccumuloInstance(instanceName);
+        config.setAccumuloUser(ACCUMULO_USER);
+        config.setAccumuloPassword(ACCUMULO_PASSWORD);
+        config.setInstanceZookeepers(zookeepers + "/fluo");
+        config.setAccumuloZookeepers(zookeepers);
+        
+        config.setApplicationName(appName);
+        config.setAccumuloTable("fluo" + appName);
+        
         config.addObservers(observers);
 
-        final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
-        return miniFluo;
+        FluoFactory.newAdmin(config).initialize( 
+        		new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+        return FluoFactory.newMiniFluo(config);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 566d2d2..124f5a9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -31,6 +31,9 @@ import org.junit.Test;
 import com.google.common.base.Optional;
 import com.google.common.io.Files;
 
+import io.fluo.api.client.FluoAdmin;
+import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import io.fluo.api.client.FluoAdmin.TableExistsException;
 import io.fluo.api.client.FluoFactory;
 import io.fluo.api.config.FluoConfiguration;
 import io.fluo.api.config.ObserverConfiguration;
@@ -48,24 +51,34 @@ public class CountStatementsIT extends ITBase {
      * statements are inserted as part of the test will not be consumed.
      *
      * @return A Mini Fluo cluster.
+     * @throws TableExistsException 
      */
     @Override
-    protected MiniFluo startMiniFluo() {
-        final File miniDataDir = Files.createTempDir();
-
+    protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverConfiguration> observers = new ArrayList<>();
 
         // Configure how the mini fluo will run.
         final FluoConfiguration config = new FluoConfiguration();
-        config.setApplicationName("IntegrationTests");
-        config.setMiniDataDir(miniDataDir.getAbsolutePath());
+        config.setMiniStartAccumulo(false);
+        config.setAccumuloInstance(instanceName);
+        config.setAccumuloUser(ACCUMULO_USER);
+        config.setAccumuloPassword(ACCUMULO_PASSWORD);
+        config.setInstanceZookeepers(zookeepers + "/fluo");
+        config.setAccumuloZookeepers(zookeepers);
+        
+        config.setApplicationName(appName);
+        config.setAccumuloTable("fluo" + appName);
+        
         config.addObservers(observers);
 
+        FluoFactory.newAdmin(config).initialize( 
+                              new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
         final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
         return miniFluo;
     }
 
+
     @Test
     public void test() {
         // Insert some Triples into the Fluo app.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 1ebc29b..873b78e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -19,6 +19,8 @@
 package org.apache.rya.indexing.pcj.fluo.integration;
 
 import static org.junit.Assert.assertEquals;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.data.Bytes;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,6 +28,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import mvm.rya.api.domain.RyaStatement;
+
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -54,10 +58,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.data.Bytes;
-import mvm.rya.api.domain.RyaStatement;
-
 /**
  * Performs integration tests over the Fluo application geared towards Rya PCJ exporting.
  * <p>
@@ -77,10 +77,10 @@ public class RyaExportIT extends ITBase {
 
         final RyaExportParameters ryaParams = new RyaExportParameters(params);
         ryaParams.setExportToRya(true);
-        ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
-        ryaParams.setZookeeperServers(accumulo.getZooKeepers());
-        ryaParams.setExporterUsername("root");
-        ryaParams.setExporterPassword("password");
+        ryaParams.setAccumuloInstanceName(instanceName);
+        ryaParams.setZookeeperServers(zookeepers);
+        ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
+        ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
 
         return params;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
new file mode 100644
index 0000000..68fd842
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+import io.fluo.api.client.FluoClient;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.repository.RepositoryConnection;
+
+import com.google.common.collect.Sets;
+
+
+/**
+ * This test ensures that the correct updates are pushed by Fluo
+ * to the external PCJ table as triples are added to Rya through
+ * the {@link RepositoryConnection}.  The key difference between these
+ * tests and those in {@link InputIT} is that streaming triples are added through
+ * the RepositoryConnection and not through the {@link FluoClient}.  These tests are
+ * designed to verify that the {@link AccumuloRyaDAO} has been integrated
+ * with the {@link PrecomputedJoinIndexer} and that the associated {@link PrecomputedJoinUpdater} updates
+ * Fluo accordingly.
+ *
+ */
+
+public class RyaInputIncrementalUpdateIT extends ITBase {
+
+    /**
+     * Ensure historic matches are included in the result.
+     */
+    @Test
+    public void streamResultsThroughRya() throws Exception {
+
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. "
+                + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
+                makeStatement("http://Bob", "http://talksTo", "http://Eve"),
+                makeStatement("http://Charlie", "http://talksTo", "http://Eve"),
+
+                makeStatement("http://Eve", "http://helps", "http://Kevin"),
+
+                makeStatement("http://Bob", "http://worksAt", "http://Chipotle"),
+                makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
+                makeStatement("http://Eve", "http://worksAt", "http://Chipotle"),
+                makeStatement("http://David", "http://worksAt", "http://Chipotle"));
+
+        // The expected results of the SPARQL query once the PCJ has been
+        // computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob"))));
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie"))));
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn,
+                new HashSet<VariableOrder>(), sparql);
+
+        // Verify the end results of the query match the expected results.
+        fluo.waitForObservers();
+
+        // Load the historic data into Rya.
+        for (final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        fluo.waitForObservers();
+
+        final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+    }
+
+    /**
+     * Simulates the case where a Triple is added to Rya, a new query that
+     * includes that triple as a historic match is inserted into Fluo, and then
+     * some new triple that matches the query is streamed into Fluo. The query's
+     * results must include both the historic result and the newly streamed
+     * result.
+     */
+    @Test
+    public void historicThenStreamedResults() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. "
+                + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
+                makeStatement("http://Alice", "http://worksAt", "http://Chipotle"),
+                makeStatement("http://Joe", "http://worksAt", "http://Chipotle"));
+
+        // Triples that will be streamed into Fluo after the PCJ has been
+        final Set<Statement> streamedTriples = Sets.newHashSet(
+                makeStatement("http://Frank", "http://talksTo", "http://Eve"),
+                makeStatement("http://Joe", "http://talksTo", "http://Eve"),
+                makeStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+
+        // Load the historic data into Rya.
+        for (final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn,
+                new HashSet<VariableOrder>(), sparql);
+        fluo.waitForObservers();
+
+        // Load the streaming data into Rya.
+        for (final Statement triple : streamedTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Ensure Alice is a match.
+        fluo.waitForObservers();
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice"))));
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank"))));
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe"))));
+
+        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+    }
+
+    @Test
+    public void historicAndStreamMultiVariables() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql = "SELECT ?x ?y " + "WHERE { " + "?x <http://talksTo> ?y. "
+                + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<Statement> historicTriples = Sets.newHashSet(
+                makeStatement("http://Alice", "http://talksTo", "http://Eve"),
+                makeStatement("http://Alice", "http://worksAt", "http://Chipotle"),
+                makeStatement("http://Joe", "http://worksAt", "http://Chipotle"));
+
+        // Triples that will be streamed into Fluo after the PCJ has been
+        final Set<Statement> streamedTriples = Sets.newHashSet(
+                makeStatement("http://Frank", "http://talksTo", "http://Betty"),
+                makeStatement("http://Joe", "http://talksTo", "http://Alice"),
+                makeStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+
+        // Load the historic data into Rya.
+        for (final Statement triple : historicTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn,
+                new HashSet<VariableOrder>(), sparql);
+        fluo.waitForObservers();
+
+        // Load the streaming data into Rya.
+        for (final Statement triple : streamedTriples) {
+            ryaConn.add(triple);
+        }
+
+        // Ensure Alice is a match.
+        fluo.waitForObservers();
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice")), new BindingImpl("y", new URIImpl("http://Eve"))));
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank")), new BindingImpl("y", new URIImpl("http://Betty"))));
+        expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe")), new BindingImpl("y", new URIImpl("http://Alice"))));
+
+        Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+        assertEquals(expected, results);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 2e00e6b..95228b8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -20,6 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.visibility;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.data.Bytes;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,6 +29,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
@@ -68,11 +73,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.data.Bytes;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-
 public class PcjVisibilityIT extends ITBase {
 
     private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
@@ -86,10 +86,10 @@ public class PcjVisibilityIT extends ITBase {
 
         final RyaExportParameters ryaParams = new RyaExportParameters(params);
         ryaParams.setExportToRya(true);
-        ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
-        ryaParams.setZookeeperServers(accumulo.getZooKeepers());
-        ryaParams.setExporterUsername("root");
-        ryaParams.setExporterPassword("password");
+        ryaParams.setAccumuloInstanceName(instanceName);
+        ryaParams.setZookeeperServers(zookeepers);
+        ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
+        ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
 
         return params;
     }