You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/05/13 16:42:48 UTC

[6/6] incubator-rya git commit: RYA-64 - Integrated Rya PCJ Secondary Index support into core Rya.

RYA-64 - Integrated Rya PCJ Secondary Index support into core Rya.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/14073a23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/14073a23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/14073a23

Branch: refs/heads/develop
Commit: 14073a23fab3c8c9dca37747d732b83793aee3a3
Parents: 9efae3e
Author: Kevin Chilton <ke...@localhost.localdomain>
Authored: Thu Apr 14 10:34:40 2016 -0400
Committer: Kevin Chilton <ke...@parsons.com>
Committed: Fri May 13 11:26:19 2016 -0400

----------------------------------------------------------------------
 extras/indexing/pom.xml                         |  16 +-
 .../accumulo/precompQuery/AccumuloPcjQuery.java |  67 +-
 .../mvm/rya/indexing/accumulo/ConfigUtils.java  | 226 +++--
 .../indexing/accumulo/VisibilityBindingSet.java |  90 --
 .../indexing/external/BindingSetDecorator.java  | 105 ---
 .../indexing/external/PrecompJoinOptimizer.java |  30 +-
 .../external/PrecomputedJoinIndexer.java        | 255 ++++++
 .../external/PrecomputedJoinIndexerConfig.java  | 113 +++
 .../PrecomputedJoinStorageSupplier.java         |  78 ++
 .../PrecomputedJoinUpdaterSupplier.java         |  78 ++
 .../external/accumulo/AccumuloPcjStorage.java   |  96 +++
 .../accumulo/AccumuloPcjStorageConfig.java      |  57 ++
 .../accumulo/AccumuloPcjStorageSupplier.java    |  77 ++
 .../indexing/external/fluo/FluoPcjUpdater.java  |  89 ++
 .../external/fluo/FluoPcjUpdaterConfig.java     | 120 +++
 .../external/fluo/FluoPcjUpdaterSupplier.java   |  93 +++
 .../external/tupleSet/AccumuloIndexSet.java     |  43 +-
 .../tupleSet/AccumuloPcjSerializer.java         | 187 -----
 .../external/tupleSet/BindingSetConverter.java  | 108 ---
 .../tupleSet/BindingSetStringConverter.java     | 149 ----
 .../indexing/external/tupleSet/PcjTables.java   | 833 -------------------
 .../VisibilityBindingSetStringConverter.java    |  62 --
 .../AccumuloConstantPcjIntegrationTest.java     |  10 +-
 .../external/AccumuloPcjIntegrationTest.java    |  16 +-
 .../indexing/external/PCJOptionalTestIT.java    |  18 +-
 .../external/PcjIntegrationTestingUtil.java     |  37 +-
 .../PrecompJoinOptimizerIntegrationTest.java    |  10 +-
 .../PrecomputedJoinStorageSupplierTest.java     |  79 ++
 .../PrecomputedJoinUpdaterSupplierTest.java     |  79 ++
 .../external/tupleSet/AccumuloIndexSetTest.java |  40 +-
 .../tupleSet/AccumuloPcjSerialzerTest.java      | 173 ----
 .../tupleSet/BindingSetStringConverterTest.java | 310 -------
 .../tupleSet/PcjTablesIntegrationTests.java     | 438 ----------
 .../external/tupleSet/PcjTablesTests.java       |  84 --
 ...VisibilityBindingSetStringConverterTest.java | 132 ---
 .../src/main/java/RyaDirectExample.java         |   6 +-
 extras/pom.xml                                  |   1 +
 extras/rya.indexing.pcj/.gitignore              |   1 +
 extras/rya.indexing.pcj/pom.xml                 |  84 ++
 .../rya/indexing/pcj/storage/PcjException.java  |  47 ++
 .../rya/indexing/pcj/storage/PcjMetadata.java   | 113 +++
 .../pcj/storage/PrecomputedJoinStorage.java     | 128 +++
 .../storage/accumulo/AccumuloPcjSerializer.java | 185 ++++
 .../storage/accumulo/BindingSetConverter.java   | 106 +++
 .../storage/accumulo/BindingSetDecorator.java   | 105 +++
 .../accumulo/BindingSetStringConverter.java     | 148 ++++
 .../storage/accumulo/PcjTableNameFactory.java   |  73 ++
 .../pcj/storage/accumulo/PcjTables.java         | 653 +++++++++++++++
 .../storage/accumulo/PcjVarOrderFactory.java    |  37 +
 .../storage/accumulo/ShiftVarOrderFactory.java  |  55 ++
 .../pcj/storage/accumulo/VariableOrder.java     | 117 +++
 .../storage/accumulo/VisibilityBindingSet.java  |  88 ++
 .../VisibilityBindingSetStringConverter.java    |  59 ++
 .../pcj/update/PrecomputedJoinUpdater.java      | 118 +++
 .../accumulo/AccumuloPcjSerialzerTest.java      | 175 ++++
 .../accumulo/BindingSetStringConverterTest.java | 311 +++++++
 .../accumulo/PcjTablesIntegrationTests.java     | 546 ++++++++++++
 .../pcj/storage/accumulo/PcjTablesTests.java    |  84 ++
 ...VisibilityBindingSetStringConverterTest.java | 133 +++
 extras/rya.pcj.fluo/pcj.fluo.api/pom.xml        |   4 +
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    |  12 +-
 .../indexing/pcj/fluo/api/GetPcjMetadata.java   |   6 +-
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |   8 +-
 .../pcj/fluo/app/FilterResultUpdater.java       |   8 +-
 .../pcj/fluo/app/JoinResultUpdater.java         |  10 +-
 .../pcj/fluo/app/QueryResultUpdater.java        |   8 +-
 .../app/export/IncrementalResultExporter.java   |   3 +-
 .../fluo/app/export/rya/RyaResultExporter.java  |   6 +-
 .../export/rya/RyaResultExporterFactory.java    |   2 +-
 .../fluo/app/observers/BindingSetUpdater.java   |   4 +-
 .../pcj/fluo/app/observers/FilterObserver.java  |   6 +-
 .../pcj/fluo/app/observers/JoinObserver.java    |   6 +-
 .../fluo/app/observers/QueryResultObserver.java |   6 +-
 .../app/observers/StatementPatternObserver.java |   6 +-
 .../pcj/fluo/app/observers/TripleObserver.java  |   6 +-
 .../pcj/fluo/app/query/CommonNodeMetadata.java  |   3 +-
 .../pcj/fluo/app/query/FilterMetadata.java      |   3 +-
 .../fluo/app/query/FluoQueryMetadataDAO.java    |   2 +-
 .../pcj/fluo/app/query/JoinMetadata.java        |   3 +-
 .../pcj/fluo/app/query/QueryMetadata.java       |   3 +-
 .../fluo/app/query/SparqlFluoQueryBuilder.java  |   3 +-
 .../app/query/StatementPatternMetadata.java     |   3 +-
 .../pcj/fluo/app/LeftOuterJoinTest.java         |   3 +-
 .../indexing/pcj/fluo/app/NaturalJoinTest.java  |   3 +-
 .../fluo/client/command/ListQueriesCommand.java |   2 +-
 .../fluo/client/command/NewQueryCommand.java    |   2 +-
 .../fluo/client/util/ParsedQueryRequest.java    |   3 +-
 .../fluo/client/util/PcjMetadataRenderer.java   |   5 +-
 .../pcj/fluo/client/ParsedQueryRequestTest.java |   3 +-
 .../fluo/client/PcjMetadataRendererTest.java    |   5 +-
 .../rya/indexing/pcj/fluo/demo/DemoDriver.java  |  14 +-
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  |  12 +-
 .../apache/rya/indexing/pcj/fluo/ITBase.java    |  18 +-
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java |   7 +-
 .../indexing/pcj/fluo/api/GetQueryReportIT.java |   4 +-
 .../indexing/pcj/fluo/api/ListQueryIdsIT.java   |   2 +-
 .../fluo/app/query/FluoQueryMetadataDAOIT.java  |   2 +-
 .../indexing/pcj/fluo/integration/InputIT.java  |   2 +-
 .../indexing/pcj/fluo/integration/QueryIT.java  |   2 +-
 .../pcj/fluo/integration/RyaExportIT.java       |  12 +-
 .../pcj/fluo/visibility/PcjVisibilityIT.java    |  20 +-
 extras/rya.pcj.fluo/pom.xml                     |  32 -
 pom.xml                                         |  38 +
 103 files changed, 4996 insertions(+), 3087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 53e516d..d819199 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -76,16 +76,24 @@ under the License.
             <artifactId>geomesa-accumulo-datastore</artifactId>
         </dependency>
 
+        <!-- PCJ Indexing -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing.pcj</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.api</artifactId>
+        </dependency>
+        
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        
         <dependency>
-            <groupId>org.apache.accumulo</groupId>
-            <artifactId>accumulo-minicluster</artifactId>
-            <version>${accumulo.version}</version>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
index dd1e9c9..f6b7819 100644
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
@@ -1,5 +1,32 @@
 package mvm.rya.accumulo.precompQuery;
 
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -24,38 +51,10 @@ package mvm.rya.accumulo.precompQuery;
  */
 import info.aduna.iteration.CloseableIteration;
 import info.aduna.iteration.Iteration;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
 import mvm.rya.api.resolver.RyaTypeResolverException;
 import mvm.rya.indexing.PcjQuery;
 import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
-import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
 import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  * This class encapsulates how pre-computed join tables are used during query
@@ -71,10 +70,10 @@ import com.google.common.collect.Sets;
  */
 public class AccumuloPcjQuery implements PcjQuery {
 	private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-	
+
 	private final Connector accCon;
 	private final String tableName;
-	
+
 	public AccumuloPcjQuery(Connector accCon, String tableName) {
 		this.accCon = accCon;
 		this.tableName = tableName;
@@ -324,7 +323,7 @@ public class AccumuloPcjQuery implements PcjQuery {
 				bSet.addBinding(var, bs.getBinding(var).getValue());
 			}
 		}
-		
+
 		return converter.convert(bSet, new VariableOrder(prefixVars));
 	}
 
@@ -333,17 +332,17 @@ public class AccumuloPcjQuery implements PcjQuery {
 	 * @param key - Accumulo key obtained from scan
 	 * @param tableVarMap - map that associated query variables and table variables
 	 * @return - BindingSet without values associated with constant constraints
-	 * @throws BindingSetConversionException 
+	 * @throws BindingSetConversionException
 	 */
 	private static BindingSet getBindingSetWithoutConstants(Key key,
 			Map<String, String> tableVarMap) throws BindingSetConversionException {
 		final byte[] row = key.getRow().getBytes();
 		final String[] varOrder = key.getColumnFamily().toString()
 				.split(ExternalTupleSet.VAR_ORDER_DELIM);
-		
+
 		BindingSet bindingSet = converter.convert(row, new VariableOrder(varOrder));
 		final QueryBindingSet temp = new QueryBindingSet(bindingSet);
-		
+
 		final QueryBindingSet bs = new QueryBindingSet();
 		for (final String var : temp.getBindingNames()) {
 			if (!tableVarMap.get(var).startsWith(ExternalTupleSet.CONST_PREFIX)) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
index cf98078..6c87182 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
@@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,20 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.FilterFunctionOptimizer;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
-import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import mvm.rya.indexing.accumulo.freetext.Tokenizer;
-import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
-import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import mvm.rya.indexing.external.PrecompJoinOptimizer;
-import mvm.rya.indexing.mongodb.MongoFreeTextIndexer;
-import mvm.rya.indexing.mongodb.MongoGeoIndexer;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
@@ -63,6 +49,20 @@ import org.openrdf.model.impl.URIImpl;
 
 import com.google.common.collect.Lists;
 
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.FilterFunctionOptimizer;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import mvm.rya.indexing.external.PrecompJoinOptimizer;
+import mvm.rya.indexing.mongodb.MongoFreeTextIndexer;
+import mvm.rya.indexing.mongodb.MongoGeoIndexer;
+
 /**
  * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
  */
@@ -88,17 +88,17 @@ public class ConfigUtils {
     public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions";
     public static final String TEMPORAL_TABLENAME = "sc.temporal.index";
     public static final String ENTITY_TABLENAME = "sc.entity.index";
-    
+
     public static final String USE_GEO = "sc.use_geo";
     public static final String USE_FREETEXT = "sc.use_freetext";
     public static final String USE_TEMPORAL = "sc.use_temporal";
     public static final String USE_ENTITY = "sc.use_entity";
     public static final String USE_PCJ = "sc.use_pcj";
     public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
-    
+
     public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail";
     public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail";
-    
+
     public static final String USE_MOCK_INSTANCE = ".useMockInstance";
 
     public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
@@ -118,22 +118,22 @@ public class ConfigUtils {
     public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
 
     public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
-    
+
     public static final String USE_MONGO = "sc.useMongo";
 
-    public static boolean isDisplayQueryPlan(Configuration conf){
+    public static boolean isDisplayQueryPlan(final Configuration conf){
         return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
     }
-    
+
     /**
      * get a value from the configuration file and throw an exception if the value does not exist.
-     * 
+     *
      * @param conf
      * @param key
      * @return
      */
-    private static String getStringCheckSet(Configuration conf, String key) {
-        String value = conf.get(key);
+    private static String getStringCheckSet(final Configuration conf, final String key) {
+        final String value = conf.get(key);
         Validate.notNull(value, key + " not set");
         return value;
     }
@@ -146,9 +146,9 @@ public class ConfigUtils {
      * @throws AccumuloSecurityException
      * @throws TableExistsException
      */
-    public static boolean createTableIfNotExists(Configuration conf, String tablename) throws AccumuloException, AccumuloSecurityException,
+    public static boolean createTableIfNotExists(final Configuration conf, final String tablename) throws AccumuloException, AccumuloSecurityException,
             TableExistsException {
-        TableOperations tops = getConnector(conf).tableOperations();
+        final TableOperations tops = getConnector(conf).tableOperations();
         if (!tops.exists(tablename)) {
             logger.info("Creating table: " + tablename);
             tops.create(tablename);
@@ -156,102 +156,102 @@ public class ConfigUtils {
         }
         return false;
     }
-    
-    private static String getIndexTableName(Configuration conf, String indexTableNameConf, String altSuffix){
+
+    private static String getIndexTableName(final Configuration conf, final String indexTableNameConf, final String altSuffix){
         String value = conf.get(indexTableNameConf);
         if (value == null){
-            String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+            final String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
             Validate.notNull(defaultTableName, indexTableNameConf + " not set and " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set.  Cannot generate table name.");
             value = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX) + altSuffix;
         }
         return value;
     }
 
-    public static String getFreeTextDocTablename(Configuration conf) {
+    public static String getFreeTextDocTablename(final Configuration conf) {
         return getIndexTableName(conf, FREE_TEXT_DOC_TABLENAME, "freetext");
     }
 
-    public static String getFreeTextTermTablename(Configuration conf) {
+    public static String getFreeTextTermTablename(final Configuration conf) {
         return getIndexTableName(conf, FREE_TEXT_TERM_TABLENAME, "freetext_term");
     }
 
-    public static int getFreeTextTermLimit(Configuration conf) {
+    public static int getFreeTextTermLimit(final Configuration conf) {
         return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
     }
 
-    public static String getGeoTablename(Configuration conf) {
+    public static String getGeoTablename(final Configuration conf) {
         return getIndexTableName(conf, GEO_TABLENAME, "geo");
     }
-    
-    public static String getTemporalTableName(Configuration conf) {
+
+    public static String getTemporalTableName(final Configuration conf) {
         return getIndexTableName(conf, TEMPORAL_TABLENAME, "temporal");
     }
-    
-    
-    public static String getEntityTableName(Configuration conf) {
+
+
+    public static String getEntityTableName(final Configuration conf) {
         return getIndexTableName(conf, ENTITY_TABLENAME, "entity");
     }
 
 
-    public static Set<URI> getFreeTextPredicates(Configuration conf) {
+    public static Set<URI> getFreeTextPredicates(final Configuration conf) {
         return getPredicates(conf, FREETEXT_PREDICATES_LIST);
     }
 
-    public static Set<URI> getGeoPredicates(Configuration conf) {
+    public static Set<URI> getGeoPredicates(final Configuration conf) {
         return getPredicates(conf, GEO_PREDICATES_LIST);
     }
     /**
-     * Used for indexing statements about date & time instances and intervals. 
+     * Used for indexing statements about date & time instances and intervals.
      * @param conf
      * @return Set of predicate URI's whose objects should be date time literals.
      */
-    public static Set<URI> getTemporalPredicates(Configuration conf) {
+    public static Set<URI> getTemporalPredicates(final Configuration conf) {
         return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
     }
 
-    private static Set<URI> getPredicates(Configuration conf, String confName) {
-        String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
-        Set<URI> predicates = new HashSet<URI>();
-        for (String prediateString : validPredicateStrings) {
+    private static Set<URI> getPredicates(final Configuration conf, final String confName) {
+        final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
+        final Set<URI> predicates = new HashSet<URI>();
+        for (final String prediateString : validPredicateStrings) {
             predicates.add(new URIImpl(prediateString));
         }
         return predicates;
     }
 
-    public static Tokenizer getFreeTextTokenizer(Configuration conf) {
-        Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
+    public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
+        final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
         return ReflectionUtils.newInstance(c, conf);
     }
 
-    public static BatchWriter createDefaultBatchWriter(String tablename, Configuration conf) throws TableNotFoundException,
+    public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) throws TableNotFoundException,
             AccumuloException, AccumuloSecurityException {
-        Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
-        Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
-        Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
-        Connector connector = ConfigUtils.getConnector(conf);
+        final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+        final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+        final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+        final Connector connector = ConfigUtils.getConnector(conf);
         return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
     }
 
-    public static MultiTableBatchWriter createMultitableBatchWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException {
-        Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
-        Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
-        Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
-        Connector connector = ConfigUtils.getConnector(conf);
+    public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
+        final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+        final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+        final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+        final Connector connector = ConfigUtils.getConnector(conf);
         return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
     }
 
-    public static Scanner createScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException,
+    public static Scanner createScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException,
             TableNotFoundException {
-        Connector connector = ConfigUtils.getConnector(conf);
-        Authorizations auths = ConfigUtils.getAuthorizations(conf);
+        final Connector connector = ConfigUtils.getConnector(conf);
+        final Authorizations auths = ConfigUtils.getAuthorizations(conf);
         return connector.createScanner(tablename, auths);
 
     }
 
-	public static BatchScanner createBatchScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException,
+	public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException,
 			TableNotFoundException {
-		Connector connector = ConfigUtils.getConnector(conf);
-		Authorizations auths = ConfigUtils.getAuthorizations(conf);
+		final Connector connector = ConfigUtils.getConnector(conf);
+		final Authorizations auths = ConfigUtils.getAuthorizations(conf);
 		Integer numThreads = null;
 		if (conf instanceof RdfCloudTripleStoreConfiguration)
 			numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
@@ -260,123 +260,123 @@ public class ConfigUtils {
 		return connector.createBatchScanner(tablename, auths, numThreads);
 	}
 
-    public static int getWriterMaxWriteThreads(Configuration conf) {
+    public static int getWriterMaxWriteThreads(final Configuration conf) {
         return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
     }
 
-    public static long getWriterMaxLatency(Configuration conf) {
+    public static long getWriterMaxLatency(final Configuration conf) {
         return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
     }
 
-    public static long getWriterMaxMemory(Configuration conf) {
+    public static long getWriterMaxMemory(final Configuration conf) {
         return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
     }
 
-    public static String getUsername(JobContext job) {
+    public static String getUsername(final JobContext job) {
         return getUsername(job.getConfiguration());
     }
 
-    public static String getUsername(Configuration conf) {
+    public static String getUsername(final Configuration conf) {
         return conf.get(CLOUDBASE_USER);
     }
 
-    public static Authorizations getAuthorizations(JobContext job) {
+    public static Authorizations getAuthorizations(final JobContext job) {
         return getAuthorizations(job.getConfiguration());
     }
 
-    public static Authorizations getAuthorizations(Configuration conf) {
-        String authString = conf.get(CLOUDBASE_AUTHS, "");
+    public static Authorizations getAuthorizations(final Configuration conf) {
+        final String authString = conf.get(CLOUDBASE_AUTHS, "");
         if (authString.isEmpty()) {
             return new Authorizations();
         }
         return new Authorizations(authString.split(","));
     }
 
-    public static Instance getInstance(JobContext job) {
+    public static Instance getInstance(final JobContext job) {
         return getInstance(job.getConfiguration());
     }
 
-    public static Instance getInstance(Configuration conf) {
+    public static Instance getInstance(final Configuration conf) {
         if (useMockInstance(conf)) {
             return new MockInstance(conf.get(CLOUDBASE_INSTANCE));
         }
         return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS));
     }
 
-    public static String getPassword(JobContext job) {
+    public static String getPassword(final JobContext job) {
         return getPassword(job.getConfiguration());
     }
 
-    public static String getPassword(Configuration conf) {
+    public static String getPassword(final Configuration conf) {
         return conf.get(CLOUDBASE_PASSWORD, "");
     }
 
-    public static Connector getConnector(JobContext job) throws AccumuloException, AccumuloSecurityException {
+    public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
         return getConnector(job.getConfiguration());
     }
 
-    public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
-        Instance instance = ConfigUtils.getInstance(conf);
+    public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
+        final Instance instance = ConfigUtils.getInstance(conf);
 
         return instance.getConnector(getUsername(conf), getPassword(conf));
     }
 
-    public static boolean useMockInstance(Configuration conf) {
+    public static boolean useMockInstance(final Configuration conf) {
         return conf.getBoolean(USE_MOCK_INSTANCE, false);
     }
 
-    private static int getNumPartitions(Configuration conf) {
+    private static int getNumPartitions(final Configuration conf) {
         return conf.getInt(NUM_PARTITIONS, 25);
     }
 
-    public static int getFreeTextDocNumPartitions(Configuration conf) {
+    public static int getFreeTextDocNumPartitions(final Configuration conf) {
         return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
     }
 
-    public static int getFreeTextTermNumPartitions(Configuration conf) {
+    public static int getFreeTextTermNumPartitions(final Configuration conf) {
         return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
     }
 
-    public static int getGeoNumPartitions(Configuration conf) {
+    public static int getGeoNumPartitions(final Configuration conf) {
         return conf.getInt(GEO_NUM_PARTITIONS, getNumPartitions(conf));
     }
-    
-    public static boolean getUseGeo(Configuration conf) {
+
+    public static boolean getUseGeo(final Configuration conf) {
         return conf.getBoolean(USE_GEO, false);
     }
-    
-    public static boolean getUseFreeText(Configuration conf) {
+
+    public static boolean getUseFreeText(final Configuration conf) {
         return conf.getBoolean(USE_FREETEXT, false);
     }
-    
-    public static boolean getUseTemporal(Configuration conf) {
+
+    public static boolean getUseTemporal(final Configuration conf) {
         return conf.getBoolean(USE_TEMPORAL, false);
     }
-    
-    public static boolean getUseEntity(Configuration conf) {
+
+    public static boolean getUseEntity(final Configuration conf) {
         return conf.getBoolean(USE_ENTITY, false);
     }
-    
-    public static boolean getUsePCJ(Configuration conf) {
+
+    public static boolean getUsePCJ(final Configuration conf) {
         return conf.getBoolean(USE_PCJ, false);
     }
-    
-    public static boolean getUseOptimalPCJ(Configuration conf) {
+
+    public static boolean getUseOptimalPCJ(final Configuration conf) {
         return conf.getBoolean(USE_OPTIMAL_PCJ, false);
     }
-    
-    public static boolean getUseMongo(Configuration conf) {
+
+    public static boolean getUseMongo(final Configuration conf) {
         return conf.getBoolean(USE_MONGO, false);
     }
-    
-    
-    public static void setIndexers(RdfCloudTripleStoreConfiguration conf) {
-        
-        List<String> indexList = Lists.newArrayList();
-        List<String> optimizers = Lists.newArrayList();
-        
-        boolean useFilterIndex = false; 
-        
+
+
+    public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
+
+        final List<String> indexList = Lists.newArrayList();
+        final List<String> optimizers = Lists.newArrayList();
+
+        boolean useFilterIndex = false;
+
         if (ConfigUtils.getUseMongo(conf)) {
             if (getUseGeo(conf)) {
                 indexList.add(MongoGeoIndexer.class.getName());
@@ -408,7 +408,7 @@ public class ConfigUtils {
             }
 
         }
-        
+
         if (useFilterIndex) {
             optimizers.add(FilterFunctionOptimizer.class.getName());
         }
@@ -418,12 +418,8 @@ public class ConfigUtils {
             optimizers.add(EntityOptimizer.class.getName());
 
         }
-        
+
         conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
         conf.setStrings(AccumuloRdfConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
-        
     }
-    
-
-    
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java
deleted file mode 100644
index b9e2351..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.indexing.accumulo;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.openrdf.query.BindingSet;
-
-import mvm.rya.indexing.external.BindingSetDecorator;
-
-/**
- * Decorates a {@link BindingSet} with a collection of visibilities.
- */
-@ParametersAreNonnullByDefault
-public class VisibilityBindingSet extends BindingSetDecorator {
-    private static final long serialVersionUID = 1L;
-    private final String visibility;
-    private volatile int hashCode;
-
-    /**
-     * @param set - Decorates the {@link BindingSet} with no visibilities.
-     */
-    public VisibilityBindingSet(final BindingSet set) {
-        this(set, "");
-    }
-
-    /**
-     * Creates a new {@link VisibilityBindingSet}
-     * @param set - The {@link BindingSet} to decorate
-     * @param visibility - The visibilities on the {@link BindingSet} (not null)
-     */
-    public VisibilityBindingSet(final BindingSet set, final String visibility) {
-        super(set);
-        this.visibility = checkNotNull(visibility);
-    }
-
-    /**
-     * @return - The Visibilities on the {@link BindingSet}
-     */
-    public String getVisibility() {
-        return visibility;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        } else if(o instanceof VisibilityBindingSet) {
-            final VisibilityBindingSet other = (VisibilityBindingSet) o;
-            return set.equals(other) && visibility.equals(other.getVisibility());
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = hashCode;
-        if(result == 0) {
-            result = 31 * result + visibility.hashCode();
-            result = 31 * result + super.hashCode();
-            hashCode = result;
-        }
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder(super.toString());
-        sb.append("\n  Visibility: " + getVisibility() + "\n");
-        return sb.toString();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java
deleted file mode 100644
index b4909bd..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.indexing.external;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.openrdf.model.Value;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-
-/**
- * Abstracts out the decoration of a {@link BindingSet}.
- */
-public abstract class BindingSetDecorator implements BindingSet {
-    private static final long serialVersionUID = 1L;
-    protected final BindingSet set;
-    private volatile int hashCode;
-
-    /**
-     * Constructs a new {@link BindingSetDecorator}, decorating the provided
-     * {@link BindingSet}.
-     * @param set - The {@link BindingSet} to be decorated. (not null)
-     */
-    public BindingSetDecorator(final BindingSet set) {
-        this.set = checkNotNull(set);
-    }
-
-    @Override
-    public Iterator<Binding> iterator() {
-        return set.iterator();
-    }
-
-    @Override
-    public Set<String> getBindingNames() {
-        return set.getBindingNames();
-    }
-
-    @Override
-    public Binding getBinding(final String bindingName) {
-        return set.getBinding(bindingName);
-    }
-
-    @Override
-    public boolean hasBinding(final String bindingName) {
-        return set.hasBinding(bindingName);
-    }
-
-    @Override
-    public Value getValue(final String bindingName) {
-        return set.getValue(bindingName);
-    }
-
-    @Override
-    public int size() {
-        return set.size();
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if(!(o instanceof BindingSetDecorator)) {
-            return false;
-        }
-        final BindingSetDecorator other = (BindingSetDecorator) o;
-        return set.equals(other.set);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = hashCode;
-        if(result == 0) {
-            result = 31 * result + set.hashCode();
-            hashCode = result;
-        }
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("  names: ");
-        for (final String name : getBindingNames()) {
-            sb.append("\n    [name]: " + name + "  ---  [value]: " + getBinding(name).getValue().toString());
-        }
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
index f8c6c77..75497da 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
@@ -27,27 +27,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator;
-import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
-import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector;
-import mvm.rya.indexing.IndexPlanValidator.TupleReArranger;
-import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector;
-import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-import mvm.rya.indexing.external.tupleSet.PcjTables;
-import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
-import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
-import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.Dataset;
 import org.openrdf.query.MalformedQueryException;
@@ -82,6 +69,19 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator;
+import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
+import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector;
+import mvm.rya.indexing.IndexPlanValidator.TupleReArranger;
+import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector;
+import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
+import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
+
 /**
  * {@link QueryOptimizer} which matches TupleExpressions associated with
  * pre-computed queries to sub-queries of a given query. Each matched sub-query

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
new file mode 100644
index 0000000..cce0a81
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
+import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier;
+
+/**
+ * Updates the state of the Precomputed Join indices that are used by Rya.
+ */
+@ParametersAreNonnullByDefault
+public class PrecomputedJoinIndexer implements AccumuloIndexer {
+    private static final Logger log = Logger.getLogger(PrecomputedJoinIndexer.class);
+
+    /**
+     * This configuration object must be set before {@link #init()} is invoked.
+     * It is set by {@link #setConf(Configuration)}.
+     */
+    private Optional<Configuration> conf = Optional.absent();
+
+    /**
+     * The Accumulo Connector that must be used when accessing an Accumulo storage.
+     * This value is provided by {@link #setConnector(Connector)}.
+     */
+    private Optional<Connector> accumuloConn = Optional.absent();
+
+    /**
+     * Provides access to the {@link Configuration} that was provided to this class
+     * using {@link #setConf(Configuration)}.
+     */
+    private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() {
+        @Override
+        public Configuration get() {
+            return getConf();
+        }
+    };
+
+    /**
+     * Provides access to the Accumulo {@link Connector} that was provided to
+     * this class using {@link #setConnector(Connector)}.
+     */
+    private final Supplier<Connector> accumuloSupplier = new Supplier<Connector>() {
+        @Override
+        public Connector get() {
+            return accumuloConn.get();
+        }
+    };
+
+    /**
+     * Creates and grants access to the {@link PrecomputedJoinStorage} that will be used
+     * to interact with the PCJ results that are stored and used by Rya.
+     */
+    private final PrecomputedJoinStorageSupplier pcjStorageSupplier =
+            new PrecomputedJoinStorageSupplier(
+                    configSupplier,
+                    new AccumuloPcjStorageSupplier(configSupplier, accumuloSupplier));
+
+    /**
+     * Creates and grants access to the {@link PrecomputedJoinUpdater} that will
+     * be used to update the state stored within the PCJ tables that are stored
+     * in Accumulo.
+     */
+    private final PrecomputedJoinUpdaterSupplier pcjUpdaterSupplier =
+            new PrecomputedJoinUpdaterSupplier(
+                    configSupplier,
+                    new FluoPcjUpdaterSupplier(configSupplier));
+
+    @Override
+    public void setConf(final Configuration conf) {
+        this.conf = Optional.fromNullable(conf);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return this.conf.get();
+    }
+
+    /**
+     * Set the connector that will be used by {@link AccumuloPcjStorage} if the
+     * application is configured to store the PCJs within Accumulo.
+     */
+    @Override
+    public void setConnector(final Connector connector) {
+        checkNotNull(connector);
+        accumuloConn = Optional.of( connector );
+    }
+
+    /**
+     * This is invoked when the host {@link RyaDAO#init()} method is invoked.
+     */
+    @Override
+    public void init() {
+        pcjStorageSupplier.get();
+        pcjUpdaterSupplier.get();
+    }
+
+    @Override
+    public void storeStatement(final RyaStatement statement) throws IOException {
+        checkNotNull(statement);
+        storeStatements( Collections.singleton(statement) );
+    }
+
+    @Override
+    public void storeStatements(final Collection<RyaStatement> statements) throws IOException {
+        checkNotNull(statements);
+        try {
+            pcjUpdaterSupplier.get().addStatements(statements);
+        } catch (final PcjUpdateException e) {
+            throw new IOException("Could not update the PCJs by adding the provided statements.", e);
+        }
+    }
+
+    @Override
+    public void deleteStatement(final RyaStatement statement) throws IOException {
+        checkNotNull(statement);
+        try {
+            pcjUpdaterSupplier.get().deleteStatements( Collections.singleton(statement) );
+        } catch (final PcjUpdateException e) {
+            throw new IOException("Could not update the PCJs by removing the provided statement.", e);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try {
+            pcjUpdaterSupplier.get().flush();
+        } catch (final PcjUpdateException e) {
+            throw new IOException("Could not flush the PCJ Updater.", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            pcjStorageSupplier.get().close();
+        } catch (final PCJStorageException e) {
+            log.error("Could not close the PCJ Storage instance.", e);
+        }
+
+        try {
+            pcjUpdaterSupplier.get().close();
+        } catch (final PcjUpdateException e) {
+            log.error("Could not close the PCJ Updater instance.", e);
+        }
+    }
+
+    /**
+     * This is invoked when the host {@link RyaDAO#destroy()} method is invoked.
+     */
+    @Override
+    public void destroy() {
+        close();
+    }
+
+    /**
+     * Deletes all data from the PCJ indices that are managed by a {@link PrecomputedJoinStorage}.
+     */
+    @Override
+    public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+        final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
+
+        try {
+            for(final String pcjId : storage.listPcjs()) {
+                try {
+                    storage.purge(pcjId);
+                } catch(final PCJStorageException e) {
+                    log.error("Could not purge the PCJ index with id: " + pcjId, e);
+                }
+            }
+        } catch (final PCJStorageException e) {
+            log.error("Could not purge the PCJ indicies because they could not be listed.", e);
+        }
+    }
+
+    /**
+     * Deletes all of the PCJ indices that are managed by {@link PrecomputedJoinStorage}.
+     */
+    @Override
+    public void dropAndDestroy() {
+        final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
+
+        try {
+            for(final String pcjId : storage.listPcjs()) {
+                try {
+                    storage.dropPcj(pcjId);
+                } catch(final PCJStorageException e) {
+                    log.error("Could not delete the PCJ index with id: " + pcjId, e);
+                }
+            }
+        } catch(final PCJStorageException e) {
+            log.error("Could not delete the PCJ indicies because they could not be listed.", e);
+        }
+    }
+
+    @Override
+    public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException {
+        // We do not need to use the writer that also writes to the core RYA tables.
+    }
+
+    @Override
+    public void dropGraph(final RyaURI... graphs) {
+        log.warn("PCJ indices do not store Graph metadata, so graph results can not be dropped.");
+    }
+
+    @Override
+    public String getTableName() {
+        // This method makes assumptions about how PCJs are stored. It's only
+        // used by AccumuloRyaDAO to purge data, so it should be replaced with
+        // a purge() method.
+        log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented.");
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
new file mode 100644
index 0000000..c56f574
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Optional;
+
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+/**
+ * Inspects the {@link Configuration} object that is provided to all instances
+ * of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer}
+ * specific values.
+ */
+@ParametersAreNonnullByDefault
+public class PrecomputedJoinIndexerConfig {
+
+    /**
+     * Enumerates the different methodologies implemented to store the PCJ indices.
+     */
+    public static enum PrecomputedJoinStorageType {
+        /**
+         * Stores each PCJ within an Accumulo table.
+         */
+        ACCUMULO;
+    }
+
+    /**
+     * Enumerates the different methodologies implemented to update the PCJ indices.
+     */
+    public static enum PrecomputedJoinUpdaterType {
+        /**
+         * Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered.
+         */
+        FLUO;
+    }
+
+    // Indicates which implementation of PrecomputedJoinStorage to use.
+    public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
+
+    // Indicates which implementation of PrecomputedJoinUpdater to use.
+    public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+
+    // The configuration object that is provided to Secondary Indexing implementations.
+    private final Configuration config;
+
+    /**
+     * Constructs an instance of {@link PrecomputedJoinIndexerConfig}.
+     *
+     * @param config - The {@link Configuration} object that is provided to
+     *   all instance of {@link RyaSecondaryIndexer}. It will be inspected
+     *   for {@link PrecomputedJoinIndexer} specific values. (not null)
+     */
+    public PrecomputedJoinIndexerConfig(final Configuration config) {
+        this.config = checkNotNull(config);
+    }
+
+    /**
+     * @return The type of {@link PrecomputedJoinStorage} to use.
+     */
+    public Optional<PrecomputedJoinStorageType> getPcjStorageType() {
+        final String storageTypeString = config.get(PCJ_STORAGE_TYPE);
+        if(storageTypeString == null) {
+            return Optional.absent();
+        }
+
+        final PrecomputedJoinStorageType storageType = PrecomputedJoinStorageType.valueOf(storageTypeString);
+        return Optional.fromNullable(storageType);
+    }
+
+    /**
+     * @return The type of {@link PrecomputedJoinUpdater} to use.
+     */
+    public Optional<PrecomputedJoinUpdaterType> getPcjUpdaterType() {
+        final String updaterTypeString = config.get(PCJ_UPDATER_TYPE);
+        if(updaterTypeString == null) {
+            return Optional.absent();
+        }
+
+        final PrecomputedJoinUpdaterType updaterType = PrecomputedJoinUpdaterType.valueOf(updaterTypeString);
+        return Optional.fromNullable(updaterType);
+    }
+
+    /**
+     * @return The configuration object that has been wrapped.
+     */
+    public Configuration getConfig() {
+        return config;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
new file mode 100644
index 0000000..bf10c84
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
+
+/**
+ * Creates an instance of {@link PrecomputedJoinStorage} based on the application's configuration.
+ */
+public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinStorage> {
+
+    private final Supplier<Configuration> configSupplier;
+    private final AccumuloPcjStorageSupplier accumuloSupplier;
+
+    /**
+     * Constructs an instance of {@link PrecomputedJoinStorageSupplier}.
+     *
+     * @param configSupplier - Provides access to the configuration of the
+     *   application used to initialize the storage. (not null)
+     * @param accumuloSupplier - Used to create an Accumulo instance of the
+     *   storage if that is the configured type. (not null)
+     */
+    public PrecomputedJoinStorageSupplier(
+            final Supplier<Configuration> configSupplier,
+            final AccumuloPcjStorageSupplier accumuloSupplier) {
+        this.configSupplier = checkNotNull(configSupplier);
+        this.accumuloSupplier = checkNotNull(accumuloSupplier);
+    }
+
+    @Override
+    public PrecomputedJoinStorage get() {
+        // Ensure a configuration has been set.
+        final Configuration config = configSupplier.get();
+        checkNotNull(config, "Could not build the PrecomputedJoinStorage until the PrecomputedJoinIndexer has been configured.");
+
+        final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
+
+        // Ensure the storage type has been set.
+        final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
+        checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
+                "' property must have one of the following values: " + PrecomputedJoinStorageType.values());
+
+        // Create and return the configured storage.
+        switch(storageType.get()) {
+            case ACCUMULO:
+                return accumuloSupplier.get();
+
+            default:
+                throw new IllegalArgumentException("Unsupported PrecomputedJoinStorageType: " + storageType.get());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java
new file mode 100644
index 0000000..cabadb4
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier;
+
+/**
+ * Creates instance of {@link PrecomputedJoinUpdater} based on the application's configuration.
+ */
+public class PrecomputedJoinUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> {
+
+    private final Supplier<Configuration> configSupplier;
+    private final FluoPcjUpdaterSupplier fluoSupplier;
+
+    /**
+     * Creates an instance of {@link PrecomputedJoinUpdaterSupplier}.
+     *
+     * @param configSupplier - Provides access to the configuration of the
+     *   application used to initialize the updater. (not null)
+     * @param fluoSupplier - Used to create a Fluo instace of the updater
+     *   if that is the configured type. (not null)
+     */
+    public PrecomputedJoinUpdaterSupplier(
+            final Supplier<Configuration> configSupplier,
+            final FluoPcjUpdaterSupplier fluoSupplier) {
+        this.configSupplier = checkNotNull(configSupplier);
+        this.fluoSupplier = checkNotNull(fluoSupplier);
+    }
+
+    @Override
+    public PrecomputedJoinUpdater get() {
+        // Ensure a configuration has been set.
+        final Configuration config = configSupplier.get();
+        checkNotNull(config, "Can not build the PrecomputedJoinUpdater until the PrecomputedJoinIndexer has been configured.");
+
+        final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
+
+        // Ensure an updater type has been set.
+        final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType();
+        checkArgument(updaterType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE +
+                "' property must have one of the following values: " + PrecomputedJoinUpdaterType.values());
+
+        // Create and return the configured updater.
+        switch(updaterType.get()) {
+            case FLUO:
+                return fluoSupplier.get();
+
+            default:
+                throw new IllegalArgumentException("Unsupported PrecomputedJoinUpdaterType: " + updaterType.get());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java
new file mode 100644
index 0000000..6e79748
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * An Accumulo backed implementation of {@link PrecomputedJoinStorage}.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloPcjStorage implements PrecomputedJoinStorage {
+
+    private final PcjTableNameFactory pcjIdFactory = new PcjTableNameFactory();
+    private final PcjTables pcjTables = new PcjTables();
+
+    private final Connector accumuloConn;
+    private final String ryaInstanceName;
+
+    /**
+     * Constructs an instance of {@link AccumuloPcjStorage}.
+     *
+     * @param accumuloConn - The connector that will be used to connect to  Accumulo. (not null)
+     * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null)
+     */
+    public AccumuloPcjStorage(final Connector accumuloConn, final String ryaInstanceName) {
+        this.accumuloConn = checkNotNull(accumuloConn);
+        this.ryaInstanceName = checkNotNull(ryaInstanceName);
+    }
+
+    @Override
+    public List<String> listPcjs() throws PCJStorageException {
+        return pcjTables.listPcjTables(accumuloConn, ryaInstanceName);
+    }
+
+    @Override
+    public String createPcj(final String sparql, final Set<VariableOrder> varOrders) throws PCJStorageException {
+        final String pcjId = pcjIdFactory.makeTableName(ryaInstanceName);
+        pcjTables.createPcjTable(accumuloConn, pcjId, varOrders, sparql);
+        return pcjId;
+    }
+
+    @Override
+    public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException {
+        return pcjTables.getPcjMetadata(accumuloConn, pcjId);
+    }
+
+    @Override
+    public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+        pcjTables.addResults(accumuloConn, pcjId, results);
+    }
+
+    @Override
+    public void purge(final String pcjId) throws PCJStorageException {
+        pcjTables.purgePcjTable(accumuloConn, pcjId);
+    }
+
+    @Override
+    public void dropPcj(final String pcjId) throws PCJStorageException {
+        pcjTables.dropPcjTable(accumuloConn, pcjId);
+    }
+
+    @Override
+    public void close() throws PCJStorageException {
+        // Accumulo Connectors don't require closing.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java
new file mode 100644
index 0000000..73b50db
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+
+/**
+ * Configuration values required to initialize a {@link AccumuloPcjStorage}.
+ */
+public class AccumuloPcjStorageConfig {
+
+    private final RdfCloudTripleStoreConfiguration config;
+
+    /**
+     * Constructs an instance of {@link AccumuloPcjStorageConfig}.
+     *
+     * @param config - The configuration values that will be interpreted. (not null)
+     */
+    public AccumuloPcjStorageConfig(final Configuration config) {
+        checkNotNull(config);
+
+        // Wrapping the config with this class so that we can use it's getTablePrefix() method.
+        this.config = new RdfCloudTripleStoreConfiguration(config) {
+            @Override
+            public RdfCloudTripleStoreConfiguration clone() {
+                return null;
+            }
+        };
+    }
+
+    /**
+     * @return The Rya Instance name the storage grants access to.
+     */
+    public String getRyaInstanceName() {
+        return config.getTablePrefix();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java
new file mode 100644
index 0000000..77b8f2e
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.accumulo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+
+/**
+ * Creates instances of {@link AccumuloPcjStorage} using the values found in a {@link Configuration}.
+ */
+public class AccumuloPcjStorageSupplier implements Supplier<AccumuloPcjStorage> {
+
+    private final Supplier<Configuration> configSupplier;
+    private final Supplier<Connector> accumuloSupplier;
+
+    /**
+     * Constructs an instance of {@link AccumuloPcjStorageSupplier}.
+     *
+     * @param configSupplier - Configures the {@link AccumuloPcjStorage} that is
+     *   supplied by this class. (not null)
+     * @param accumuloSupplier - Provides the {@link Connector} that is used by
+     *   the {@link AccumuloPcjStorage} that is supplied by this class. (not null)
+     */
+    public AccumuloPcjStorageSupplier(
+            final Supplier<Configuration> configSupplier,
+            final Supplier<Connector> accumuloSupplier) {
+        this.configSupplier = checkNotNull(configSupplier);
+        this.accumuloSupplier = checkNotNull(accumuloSupplier);
+    }
+
+    @Override
+    public AccumuloPcjStorage get() {
+        // Ensure a configuration has been set.
+        final Configuration config = configSupplier.get();
+        checkNotNull(config, "Could not create a AccumuloPcjStorage because the application's configuration has not been provided yet.");
+
+        // Ensure the correct storage type has been set.
+        final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
+
+        final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
+        checkArgument(storageType.isPresent() && (storageType.get() == PrecomputedJoinStorageType.ACCUMULO),
+                "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
+                "' value be set to '" + PrecomputedJoinStorageType.ACCUMULO + "'.");
+
+        // Ensure the Accumulo connector has been set.
+        final Connector accumuloConn = accumuloSupplier.get();
+        checkNotNull(accumuloConn, "The Accumulo Connector must be set before initializing the AccumuloPcjStorage.");
+
+        final String ryaInstanceName = new AccumuloPcjStorageConfig(config).getRyaInstanceName();
+        return new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java
new file mode 100644
index 0000000..901bd61
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Optional;
+
+import io.fluo.api.client.FluoClient;
+import mvm.rya.api.domain.RyaStatement;
+
+/**
+ * Updates the PCJ indices by forwarding the statement additions/removals to
+ * a Fluo application.
+ */
+@ParametersAreNonnullByDefault
+public class FluoPcjUpdater implements PrecomputedJoinUpdater {
+    private static final Logger log = Logger.getLogger(FluoPcjUpdater.class);
+
+    // Used to only print the unsupported delete operation once.
+    private boolean deleteWarningPrinted = false;
+
+    private final FluoClient fluoClient;
+    private final InsertTriples insertTriples = new InsertTriples();
+    private final String statementVis;
+
+    /**
+     * Constructs an instance of {@link FluoPcjUpdater}.
+     *
+     * @param fluoClient - A connection to the Fluo table new statements will be
+     *   inserted into and deleted from. (not null)
+     * @param statementVis - The visibility label that will be applied to all
+     *   statements that are inserted via the Fluo PCJ updater. (not null)
+     */
+    public FluoPcjUpdater(final FluoClient fluoClient, final String statementVis) {
+        this.fluoClient = checkNotNull(fluoClient);
+        this.statementVis = checkNotNull(statementVis);
+    }
+
+    @Override
+    public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException {
+        insertTriples.insert(fluoClient, statements, Optional.of(statementVis));
+    }
+
+    @Override
+    public void deleteStatements(final Collection<RyaStatement> statements) throws PcjUpdateException {
+        // The Fluo application does not support statement deletion.
+        if(!deleteWarningPrinted) {
+            log.warn("The Fluo PCJ updating application does not support Statement deletion, " +
+                    "but you are trying to use that feature. This may result in your PCJ index " +
+                    "no longer reflecting the Statemetns that are stored in the core Rya tables.");
+            deleteWarningPrinted = true;
+        }
+    }
+
+    @Override
+    public void flush() {
+        // The Fluo application does not do any batching, so this doesn't do anything.
+    }
+
+    @Override
+    public void close() {
+        fluoClient.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
new file mode 100644
index 0000000..4378a4a
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Optional;
+
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Configuration values required to initialize a {@link FluoPcjUpdater}.
+ */
+public final class FluoPcjUpdaterConfig {
+
+    // Defines which Fluo application is running for this instance of Rya.
+    public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+
+    // Values that define which Accumulo instance hosts the Fluo application's table.
+    public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS;
+    public static final String ACCUMULO_INSTANCE = ConfigUtils.CLOUDBASE_INSTANCE;
+    public static final String ACCUMULO_USERNAME = ConfigUtils.CLOUDBASE_USER;
+    public static final String ACCUMULO_PASSWORD = ConfigUtils.CLOUDBASE_PASSWORD;
+
+    // Values that define the visibilities associated with statement that are inserted by the fluo updater.
+    public static final String STATEMENT_VISIBILITY = ConfigUtils.CLOUDBASE_AUTHS;
+
+    // The configuration object that is provided to Secondary Indexing implementations.
+    private final Configuration config;
+
+    /**
+     * Constructs an instance of {@link FluoPcjUpdaterConfig}.
+     *
+     * @param config - The configuration values that will be interpreted. (not null)
+     */
+    public FluoPcjUpdaterConfig(final Configuration config) {
+        this.config = checkNotNull(config);
+    }
+
+    /**
+     * @return The name of the Fluo Application this instance of RYA is
+     *   using to incrementally update PCJs.
+     */
+    public Optional<String> getFluoAppName() {
+        return Optional.fromNullable(config.get(FLUO_APP_NAME));
+    }
+
+    /**
+     * This value is the {@link #getAccumuloInstance()} value appended with the
+     * "/fluo" namespace.
+     *
+     * @return The zookeepers that are used to manage Fluo state. ({@code null}
+     *   if not configured)
+     */
+    public Optional<String> getFluoZookeepers() {
+        final Optional<String> accumuloZookeepers = getAccumuloZookeepers();
+        if(!accumuloZookeepers.isPresent()) {
+            return Optional.absent();
+        }
+        return Optional.of( accumuloZookeepers.get() + "/fluo" );
+    }
+
+    /**
+     * @return The zookeepers used to connect to the Accumulo instance that
+     *   is storing the state of the Fluo Application.
+     */
+    public Optional<String> getAccumuloZookeepers() {
+        return Optional.fromNullable(config.get(ACCUMULO_ZOOKEEPERS));
+    }
+
+    /**
+     * @return The instance name of the Accumulo instance that is storing
+     *   the state of the Fluo Application.
+     */
+    public Optional<String> getAccumuloInstance() {
+        return Optional.fromNullable(config.get(ACCUMULO_INSTANCE));
+    }
+
+    /**
+     * @return The username the indexer will authenticate when connecting
+     *   to the Accumulo instance that stores the state of the Fluo Application.
+     */
+    public Optional<String> getAccumuloUsername() {
+        return Optional.fromNullable(config.get(ACCUMULO_USERNAME));
+    }
+
+    /**
+     * @return The password the indexer will authenticate when connecting
+     *   to the Accumulo instance that stores the state of the Fluo Application.
+     */
+    public Optional<String> getAccumuloPassword() {
+        return Optional.fromNullable(config.get(ACCUMULO_PASSWORD));
+    }
+
+    /**
+     * @return The visibility labels that will be attached to the statements
+     *   that are inserted into the Fluo Application.
+     */
+    public Optional<String> getStatementVisibility() {
+        return Optional.fromNullable(config.get(STATEMENT_VISIBILITY));
+    }
+}
\ No newline at end of file