You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/12 21:25:35 UTC

[7/8] incubator-rya git commit: RYA-303 Mongo PCJ Support. Closes #172.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
deleted file mode 100644
index 9311200..0000000
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.accumulo;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.utils.ConnectorFactory;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.instance.RyaDetails;
-import org.apache.rya.indexing.FilterFunctionOptimizer;
-import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
-import org.apache.rya.indexing.accumulo.entity.EntityOptimizer;
-import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import org.apache.rya.indexing.accumulo.freetext.Tokenizer;
-import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import org.apache.rya.indexing.entity.EntityIndexOptimizer;
-import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
-import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
-import org.apache.rya.indexing.pcj.matching.PCJOptimizer;
-import org.apache.rya.indexing.statement.metadata.matching.StatementMetadataOptimizer;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.URIImpl;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
-/**
- * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
- * Soon will deprecate this class.  Use installer for the set methods, use {@link RyaDetails} for the get methods.
- * New code must separate parameters that are set at Rya install time from that which is specific to the client.
- * Also Accumulo index tables are pushed down to the implementation and not configured in conf.
- */
-public class ConfigUtils {
-    private static final Logger logger = Logger.getLogger(ConfigUtils.class);
-
-    /**
-     * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_TBL_PREFIX} instead.
-     */
-    @Deprecated
-    public static final String CLOUDBASE_TBL_PREFIX = RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX;
-    
-    /**
-     * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_INSTANCE} instead.
-     */
-    @Deprecated
-    public static final String CLOUDBASE_INSTANCE = AccumuloRdfConfiguration.CLOUDBASE_INSTANCE;
-    
-    /**
-     * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_ZOOKEEPERS} instead.
-     */
-    @Deprecated
-    public static final String CLOUDBASE_ZOOKEEPERS = AccumuloRdfConfiguration.CLOUDBASE_ZOOKEEPERS;
-    
-    /**
-     * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_USER} instead.
-     */
-    @Deprecated
-    public static final String CLOUDBASE_USER = AccumuloRdfConfiguration.CLOUDBASE_USER;
-    
-    /**
-     * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_PASSWORD} instead.
-     */
-    @Deprecated
-    public static final String CLOUDBASE_PASSWORD = AccumuloRdfConfiguration.CLOUDBASE_PASSWORD;
-    /**
-     * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_QUERY_AUTH} instead.
-     */
-    @Deprecated
-    public static final String CLOUDBASE_AUTHS = RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH;
-
-    public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads";
-    public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency";
-    public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory";
-
-    public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit";
-
-    public static final String USE_FREETEXT = "sc.use_freetext";
-    public static final String USE_TEMPORAL = "sc.use_temporal";
-    public static final String USE_ENTITY = "sc.use_entity";
-    public static final String USE_PCJ = "sc.use_pcj";
-    public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
-    public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
-
-    public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
-    public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
-    public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
-    public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
-
-    public static final String USE_MOCK_INSTANCE = AccumuloRdfConfiguration.USE_MOCK_INSTANCE;
-
-    public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
-
-    private static final int WRITER_MAX_WRITE_THREADS = 1;
-    private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
-    private static final long WRITER_MAX_MEMORY = 10000L;
-
-    public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan";
-
-    public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates";
-    public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text";
-    public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term";
-
-    public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class";
-
-    public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
-
-    public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
-
-    public static final String USE_MONGO = "sc.useMongo";
-
-    public static boolean isDisplayQueryPlan(final Configuration conf) {
-        return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
-    }
-
-    /**
-     * get a value from the configuration file and throw an exception if the
-     * value does not exist.
-     *
-     * @param conf
-     * @param key
-     * @return
-     */
-    private static String getStringCheckSet(final Configuration conf, final String key) {
-        final String value = conf.get(key);
-        requireNonNull(value, key + " not set");
-        return value;
-    }
-
-    /**
-     * @param conf
-     * @param tablename
-     * @return if the table was created
-     * @throws AccumuloException
-     * @throws AccumuloSecurityException
-     * @throws TableExistsException
-     */
-    public static boolean createTableIfNotExists(final Configuration conf, final String tablename)
-            throws AccumuloException, AccumuloSecurityException, TableExistsException {
-        final TableOperations tops = getConnector(conf).tableOperations();
-        if (!tops.exists(tablename)) {
-            logger.info("Creating table: " + tablename);
-            tops.create(tablename);
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Lookup the table name prefix in the conf and throw an error if it is
-     * null. Future, get table prefix from RyaDetails -- the Rya instance name
-     * -- also getting info from the RyaDetails should happen within
-     * RyaSailFactory and not ConfigUtils.
-     * 
-     * @param conf
-     *            Rya configuration map where it extracts the prefix (instance
-     *            name)
-     * @return index table prefix corresponding to this Rya instance
-     */
-    public static String getTablePrefix(final Configuration conf) {
-        final String tablePrefix;
-        tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
-        requireNonNull(tablePrefix,
-                "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set.  Cannot generate table name.");
-        return tablePrefix;
-    }
-
-    public static int getFreeTextTermLimit(final Configuration conf) {
-        return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
-    }
-
-    public static Set<URI> getFreeTextPredicates(final Configuration conf) {
-        return getPredicates(conf, FREETEXT_PREDICATES_LIST);
-    }
-
-    public static Set<URI> getGeoPredicates(final Configuration conf) {
-        return getPredicates(conf, GEO_PREDICATES_LIST);
-    }
-
-    /**
-     * Used for indexing statements about date & time instances and intervals.
-     * 
-     * @param conf
-     * @return Set of predicate URI's whose objects should be date time
-     *         literals.
-     */
-    public static Set<URI> getTemporalPredicates(final Configuration conf) {
-        return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
-    }
-
-    protected static Set<URI> getPredicates(final Configuration conf, final String confName) {
-        final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
-        final Set<URI> predicates = new HashSet<>();
-        for (final String prediateString : validPredicateStrings) {
-            predicates.add(new URIImpl(prediateString));
-        }
-        return predicates;
-    }
-
-    public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
-        final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
-        return ReflectionUtils.newInstance(c, conf);
-    }
-
-    public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf)
-            throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-        final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
-        final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
-        final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
-        final Connector connector = ConfigUtils.getConnector(conf);
-        return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
-    }
-
-    public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf)
-            throws AccumuloException, AccumuloSecurityException {
-        final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
-        final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
-        final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
-        final Connector connector = ConfigUtils.getConnector(conf);
-        return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
-    }
-
-    public static Scanner createScanner(final String tablename, final Configuration conf)
-            throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-        final Connector connector = ConfigUtils.getConnector(conf);
-        final Authorizations auths = ConfigUtils.getAuthorizations(conf);
-        return connector.createScanner(tablename, auths);
-
-    }
-
-    public static BatchScanner createBatchScanner(final String tablename, final Configuration conf)
-            throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-        final Connector connector = ConfigUtils.getConnector(conf);
-        final Authorizations auths = ConfigUtils.getAuthorizations(conf);
-        Integer numThreads = null;
-        if (conf instanceof RdfCloudTripleStoreConfiguration) {
-            numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
-        } else {
-            numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2);
-        }
-        return connector.createBatchScanner(tablename, auths, numThreads);
-    }
-
-    public static int getWriterMaxWriteThreads(final Configuration conf) {
-        return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
-    }
-
-    public static long getWriterMaxLatency(final Configuration conf) {
-        return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
-    }
-
-    public static long getWriterMaxMemory(final Configuration conf) {
-        return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
-    }
-
-    public static String getUsername(final JobContext job) {
-        return getUsername(job.getConfiguration());
-    }
-
-    /**
-     * Get the Accumulo username from the configuration object that is meant to
-     * be used when connecting a {@link Connector} to Accumulo.
-     *
-     * @param conf - The configuration object that will be interrogated. (not null)
-     * @return The username if one could be found; otherwise {@code null}.
-     */
-    public static String getUsername(final Configuration conf) {
-        return new AccumuloRdfConfiguration(conf).getUsername();
-    }
-
-    public static Authorizations getAuthorizations(final JobContext job) {
-        return getAuthorizations(job.getConfiguration());
-    }
-
-    public static Authorizations getAuthorizations(final Configuration conf) {
-        final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
-        if (authString.isEmpty()) {
-            return new Authorizations();
-        }
-        return new Authorizations(authString.split(","));
-    }
-
-    public static Instance getInstance(final JobContext job) {
-        return getInstance(job.getConfiguration());
-    }
-
-    /**
-     * Create an {@link Instance} that may be used to create {@link Connector}s
-     * to Accumulo. If the configuration has the {@link #USE_MOCK_INSTANCE} flag
-     * set, then the instance will be be a {@link MockInstance} instead of a
-     * Zookeeper backed instance.
-     *
-     * @param conf - The configuration object that will be interrogated. (not null)
-     * @return The {@link Instance} that may be used to connect to Accumulo.
-     */
-    public static Instance getInstance(final Configuration conf) {
-        // Pull out the Accumulo specific configuration values.
-        final AccumuloRdfConfiguration accConf = new AccumuloRdfConfiguration(conf);
-        String instanceName = accConf.getInstanceName();
-        String zoookeepers = accConf.getZookeepers();
-
-        // Create an Instance a mock if the mock flag is set.
-        if (useMockInstance(conf)) {
-            return new MockInstance(instanceName);
-        }
-
-        // Otherwise create an Instance to a Zookeeper managed instance of Accumulo.
-        return new ZooKeeperInstance(instanceName, zoookeepers);
-    }
-
-    public static String getPassword(final JobContext job) {
-        return getPassword(job.getConfiguration());
-    }
-
-    /**
-     * Get the Accumulo password from the configuration object that is meant to
-     * be used when connecting a {@link Connector} to Accumulo.
-     *
-     * @param conf - The configuration object that will be interrogated. (not null)
-     * @return The password if one could be found; otherwise an empty string.
-     */
-    public static String getPassword(final Configuration conf) {
-        return new AccumuloRdfConfiguration(conf).getPassword();
-    }
-
-    public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
-        return getConnector(job.getConfiguration());
-    }
-
-    /**
-     * Create an Accumulo {@link Connector} using the configured connection information.
-     * If the connection information  points to a mock instance of Accumulo, then the
-     * {@link #USE_MOCK_INSTANCE} flag must be set.
-     *
-     * @param conf - Configures how the connector will be built. (not null)
-     * @return A {@link Connector} that may be used to interact with the configured Accumulo instance.
-     * @throws AccumuloException The connector couldn't be created because of an Accumulo problem.
-     * @throws AccumuloSecurityException The connector couldn't be created because of an Accumulo security violation.
-     */
-    public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
-        return ConnectorFactory.connect( new AccumuloRdfConfiguration(conf) );
-    }
-
-    /**
-     * Indicates that a Mock instance of Accumulo is being used to back the Rya instance.
-     *
-     * @param conf - The configuration object that will be interrogated. (not null)
-     * @return {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}.
-     */
-    public static boolean useMockInstance(final Configuration conf) {
-        return new AccumuloRdfConfiguration(conf).useMockInstance();
-    }
-
-    protected static int getNumPartitions(final Configuration conf) {
-        return conf.getInt(NUM_PARTITIONS, 25);
-    }
-
-    public static int getFreeTextDocNumPartitions(final Configuration conf) {
-        return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
-    }
-
-    public static int getFreeTextTermNumPartitions(final Configuration conf) {
-        return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
-    }
-
-    public static boolean getUseFreeText(final Configuration conf) {
-        return conf.getBoolean(USE_FREETEXT, false);
-    }
-
-    public static boolean getUseTemporal(final Configuration conf) {
-        return conf.getBoolean(USE_TEMPORAL, false);
-    }
-
-    public static boolean getUseEntity(final Configuration conf) {
-        return conf.getBoolean(USE_ENTITY, false);
-    }
-
-    public static boolean getUsePCJ(final Configuration conf) {
-        return conf.getBoolean(USE_PCJ, false);
-    }
-
-    public static boolean getUseOptimalPCJ(final Configuration conf) {
-        return conf.getBoolean(USE_OPTIMAL_PCJ, false);
-    }
-
-    public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
-        return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
-    }
-
-
-    /**
-     * @return The name of the Fluo Application this instance of RYA is using to
-     *         incrementally update PCJs.
-     */
-    // TODO delete this eventually and use Details table
-    public Optional<String> getFluoAppName(final Configuration conf) {
-        return Optional.fromNullable(conf.get(FLUO_APP_NAME));
-    }
-
-
-    public static boolean getUseMongo(final Configuration conf) {
-        return conf.getBoolean(USE_MONGO, false);
-    }
-
-
-    public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
-
-        final List<String> indexList = Lists.newArrayList();
-        final List<String> optimizers = Lists.newArrayList();
-
-        boolean useFilterIndex = false;
-
-        if (ConfigUtils.getUseMongo(conf)) {
-            if (getUseFreeText(conf)) {
-                indexList.add(MongoFreeTextIndexer.class.getName());
-                useFilterIndex = true;
-            }
-
-            if (getUseEntity(conf)) {
-                indexList.add(MongoEntityIndexer.class.getName());
-                optimizers.add(EntityIndexOptimizer.class.getName());
-            }
-
-            if (getUseTemporal(conf)) {
-                indexList.add(MongoTemporalIndexer.class.getName());
-                useFilterIndex = true;
-            }
-        } else {
-        	if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
-        		conf.setPcjOptimizer(PCJOptimizer.class);
-        	}
-
-            if (getUsePcjUpdaterIndex(conf)) {
-                indexList.add(PrecomputedJoinIndexer.class.getName());
-            }
-
-            if (getUseFreeText(conf)) {
-                indexList.add(AccumuloFreeTextIndexer.class.getName());
-                useFilterIndex = true;
-            }
-
-            if (getUseTemporal(conf)) {
-                indexList.add(AccumuloTemporalIndexer.class.getName());
-                useFilterIndex = true;
-            }
-
-            if (getUseEntity(conf)) {
-                indexList.add(EntityCentricIndex.class.getName());
-                optimizers.add(EntityOptimizer.class.getName());
-            }
-        }
-
-        if (useFilterIndex) {
-            optimizers.add(FilterFunctionOptimizer.class.getName());
-        }
-
-        if (conf.getUseStatementMetadata()) {
-            optimizers.add(StatementMetadataOptimizer.class.getName());
-        }
-
-<<<<<<< HEAD
-        conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[] {}));
-        conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[] {}));
-    }
-}
-=======
-        final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS);
-        if(existingIndexers != null ) {
-            for(final String idx : existingIndexers) {
-                indexList.add(idx);
-            }
-        }
-
-        final String[] existingOptimizers = conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS);
-        if(existingOptimizers != null ) {
-            for(final String opt : existingOptimizers) {
-                optimizers.add(opt);
-            }
-        }
-
-        conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
-        conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
-    }
-
-
-
-}
->>>>>>> RYA-236 Changes to other indexers

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
index d73e180..4faf4a0 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
@@ -47,7 +47,11 @@ public class PrecomputedJoinIndexerConfig {
         /**
          * Stores each PCJ within an Accumulo table.
          */
-        ACCUMULO;
+        ACCUMULO,
+        /**
+         * Stores each PCJ within a MongoDB collection.
+         */
+        MONGO;
     }
 
     /**
@@ -106,8 +110,6 @@ public class PrecomputedJoinIndexerConfig {
         return Optional.fromNullable(updaterType);
     }
 
-
-
     public boolean getUseFluoUpdater() {
     	return config.getBoolean(ConfigUtils.USE_PCJ_UPDATER_INDEX, false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java
new file mode 100644
index 0000000..d3fa07e
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.openrdf.query.MalformedQueryException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.mongodb.MongoClient;
+
+/**
+ * Implementation of {@link AbstractPcjIndexSetProvider} for MongoDB.
+ */
+public class MongoPcjIndexSetProvider extends AbstractPcjIndexSetProvider {
+    /**
+     * Creates a new {@link MongoPcjIndexSetProvider}.
+     * @param conf - The configuration for this provider. (not null)
+     */
+    public MongoPcjIndexSetProvider(final StatefulMongoDBRdfConfiguration conf) {
+        super(conf);
+    }
+
+    /**
+     * Creates a new {@link MongoPcjIndexSetProvider}.
+     * @param conf - The configuration for this provider.
+     * @param indices - The predefined indicies on this provider.
+     * @param client - The {@link MongoClient} used to connect to mongo.
+     */
+    public MongoPcjIndexSetProvider(final StatefulMongoDBRdfConfiguration conf, final List<ExternalTupleSet> indices) {
+        super(conf, indices);
+    }
+
+    @Override
+    protected List<ExternalTupleSet> getIndices() throws PcjIndexSetException {
+        try {
+            final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf;
+            final MongoClient client = mongoConf.getMongoClient();
+            final MongoPcjDocuments pcjDocs = new MongoPcjDocuments(client, mongoConf.getRyaInstanceName());
+            List<String> documents = null;
+
+            documents = mongoConf.getPcjTables();
+            // this maps associates pcj document name with pcj sparql query
+            final Map<String, String> indexDocuments = Maps.newLinkedHashMap();
+
+            try(final PrecomputedJoinStorage storage = new MongoPcjStorage(client, mongoConf.getRyaInstanceName())) {
+
+                final boolean docsProvided = documents != null && !documents.isEmpty();
+
+                if (docsProvided) {
+                    // if tables provided, associate table name with sparql
+                    for (final String doc : documents) {
+                        indexDocuments.put(doc, storage.getPcjMetadata(doc).getSparql());
+                    }
+                } else if (hasRyaDetails()) {
+                    // If this is a newer install of Rya, and it has PCJ Details, then
+                    // use those.
+                    final List<String> ids = storage.listPcjs();
+                    for (final String pcjId : ids) {
+                        indexDocuments.put(pcjId, storage.getPcjMetadata(pcjId).getSparql());
+                    }
+                } else {
+                    // Otherwise figure it out by getting document IDs.
+                    documents = pcjDocs.listPcjDocuments();
+                    for (final String pcjId : documents) {
+                        if (pcjId.startsWith("INDEX")) {
+                            indexDocuments.put(pcjId, pcjDocs.getPcjMetadata(pcjId).getSparql());
+                        }
+                    }
+                }
+            }
+
+            final List<ExternalTupleSet> index = Lists.newArrayList();
+            if (indexDocuments.isEmpty()) {
+                log.info("No Index found");
+            } else {
+                for (final String pcjID : indexDocuments.keySet()) {
+                    final String indexSparqlString = indexDocuments.get(pcjID);
+                    index.add(new MongoPcjQueryNode(indexSparqlString, pcjID, pcjDocs));
+                }
+            }
+            return index;
+        } catch (final PCJStorageException | MalformedQueryException e) {
+            throw new PcjIndexSetException("Failed to get indicies for this PCJ index.", e);
+        }
+    }
+
+    private boolean hasRyaDetails() {
+        final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf;
+        final RyaDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(mongoConf.getMongoClient(), mongoConf.getRyaInstanceName());
+        try {
+            detailsRepo.getRyaInstanceDetails();
+            return true;
+        } catch (final RyaDetailsRepositoryException e) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java
new file mode 100644
index 0000000..c03ee99
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.api.utils.IteratorWrapper;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Indexing Node for PCJs expressions to be inserted into execution plans.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoPcjQueryNode extends ExternalTupleSet implements ExternalBatchingIterator {
+    private static final Logger log = Logger.getLogger(MongoPcjQueryNode.class);
+    private final String pcjId;
+    private final MongoPcjDocuments pcjDocs;
+
+    /**
+     * Creates a new {@link MongoPcjQueryNode}.
+     *
+     * @param sparql - sparql query whose results will be stored in PCJ document. (not empty of null)
+     * @param pcjId - name of an existing PCJ. (not empty or null)
+     * @param pcjDocs - {@link MongoPcjDocuments} used to maintain PCJs in mongo. (not null)
+     *
+     * @throws MalformedQueryException - The SPARQL query needs to contain a projection.
+     */
+    public MongoPcjQueryNode(final String sparql, final String pcjId, final MongoPcjDocuments pcjDocs) throws MalformedQueryException {
+        checkArgument(!Strings.isNullOrEmpty(sparql));
+        checkArgument(!Strings.isNullOrEmpty(pcjId));
+        this.pcjDocs = checkNotNull(pcjDocs);
+        this.pcjId = pcjId;
+        final SPARQLParser sp = new SPARQLParser();
+        final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null);
+        final TupleExpr te = pq.getTupleExpr();
+        Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), "TupleExpr is an invalid PCJ.");
+
+        final Optional<Projection> projection = new ParsedQueryUtil().findProjection(pq);
+        if (!projection.isPresent()) {
+            throw new MalformedQueryException("SPARQL query '" + sparql + "' does not contain a Projection.");
+        }
+        setProjectionExpr(projection.get());
+    }
+
+    /**
+     * Creates a new {@link MongoPcjQueryNode}.
+     *
+     * @param conf - configuration to use to connect to mongo. (not null)
+     * @param pcjId - name of an existing PCJ. (not empty or null)
+     */
+    public MongoPcjQueryNode(final Configuration conf, final String pcjId) {
+        checkNotNull(conf);
+        checkArgument(conf instanceof StatefulMongoDBRdfConfiguration,
+                "The configuration must be a StatefulMongoDBRdfConfiguration, found: " + conf.getClass().getSimpleName());
+        checkArgument(!Strings.isNullOrEmpty(pcjId));
+        final StatefulMongoDBRdfConfiguration statefulConf = (StatefulMongoDBRdfConfiguration) conf;
+        pcjDocs = new MongoPcjDocuments(statefulConf.getMongoClient(), statefulConf.getRyaInstanceName());
+        this.pcjId = checkNotNull(pcjId);
+    }
+
+    /**
+     * returns size of table for query planning
+     */
+    @Override
+    public double cardinality() {
+        double cardinality = 0;
+        try {
+            cardinality = pcjDocs.getPcjMetadata(pcjId).getCardinality();
+        } catch (final PcjException e) {
+            log.error("The PCJ has not been created, so has no cardinality.", e);
+        }
+        return cardinality;
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindingset)
+            throws QueryEvaluationException {
+        return this.evaluate(Collections.singleton(bindingset));
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset)
+            throws QueryEvaluationException {
+
+        if (bindingset.isEmpty()) {
+            return new IteratorWrapper<BindingSet, QueryEvaluationException>(new HashSet<BindingSet>().iterator());
+        }
+        final CloseableIterator<BindingSet> iter = pcjDocs.getResults(pcjId, bindingset);
+        return new CloseableIteration<BindingSet, QueryEvaluationException>() {
+            @Override
+            public boolean hasNext() throws QueryEvaluationException {
+                return iter.hasNext();
+            }
+
+            @Override
+            public BindingSet next() throws QueryEvaluationException {
+                final BindingSet bs = iter.next();
+                return bs;
+            }
+
+            @Override
+            public void remove() throws QueryEvaluationException {
+                iter.remove();
+            }
+
+            @Override
+            public void close() throws QueryEvaluationException {
+                try {
+                    iter.close();
+                } catch (final Exception e) {
+                    throw new QueryEvaluationException(e.getMessage(), e);
+                }
+            }
+        };
+    }
+
+    @Override
+    public String getSignature() {
+        return "(Mongo PcjQueryNode) " + Joiner.on(", ").join(super.getTupleExpr().getProjectionElemList().getElements()).replaceAll("\\s+", " ");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
deleted file mode 100644
index 4a15665..0000000
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.matching;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.instance.RyaDetailsRepository;
-import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
-import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.matching.ExternalSetProvider;
-import org.apache.rya.indexing.external.matching.QuerySegment;
-import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s.
- * This provider uses either user specified Accumulo configuration information or user a specified
- * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets.  If Accumulo configuration
- * is provided, the provider connects to an instance of RyaDetails and populates the cache with
- * PCJs registered in RyaDetails.
- *
- */
-public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTupleSet> {
-
-    private static final Logger log = Logger.getLogger(ExternalSetProvider.class);
-    private static final PCJToSegmentConverter converter = new PCJToSegmentConverter();
-    private List<ExternalTupleSet> indexCache;
-    private final Configuration conf;
-    private boolean init = false;
-
-    public AccumuloIndexSetProvider(final Configuration conf) {
-        this.conf = Objects.requireNonNull(conf);
-    }
-
-    public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
-        this(conf);
-        indexCache = indices;
-        init = true;
-    }
-
-    /**
-     *
-     * @return - size of underlying PCJ cache
-     * @throws Exception
-     */
-    public int size() throws Exception {
-        if(!init) {
-            indexCache = PCJOptimizerUtilities.getValidPCJs(getAccIndices());
-            init = true;
-        }
-        return indexCache.size();
-    }
-
-    /**
-     * @param segment - QuerySegment used to get relevant queries form index cache for matching
-     * @return List of PCJs for matching
-     */
-    @Override
-    public List<ExternalTupleSet> getExternalSets(final QuerySegment<ExternalTupleSet> segment) {
-        try {
-            if(!init) {
-                indexCache = PCJOptimizerUtilities.getValidPCJs(getAccIndices());
-                init = true;
-            }
-            final TupleExpr query = segment.getQuery().getTupleExpr();
-            final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(query, indexCache);
-            final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices();
-            final List<ExternalTupleSet> tuples = new ArrayList<>();
-            for (final ExternalTupleSet tuple: pcjs) {
-                final QuerySegment<ExternalTupleSet> pcj = converter.setToSegment(tuple);
-                if (segment.containsQuerySegment(pcj)) {
-                    tuples.add(tuple);
-                }
-            }
-            return tuples;
-
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * @param segment - QuerySegment used to get relevant queries form index cache for matching
-     *
-     * @return Iterator of Lists (combos) of PCJs used to build an optimal query plan
-     */
-    @Override
-    public Iterator<List<ExternalTupleSet>> getExternalSetCombos(final QuerySegment<ExternalTupleSet> segment) {
-        final ValidIndexCombinationGenerator comboGen = new ValidIndexCombinationGenerator(segment.getOrderedNodes());
-        return comboGen.getValidIndexCombos(getExternalSets(segment));
-    }
-
-    /**
-     *
-     *
-     * @param conf
-     *            - client configuration
-     *
-     * @return - list of {@link ExternalTupleSet}s or PCJs that are either
-     *         specified by user in Configuration or exist in system.
-     *
-     * @throws MalformedQueryException
-     * @throws SailException
-     * @throws QueryEvaluationException
-     * @throws TableNotFoundException
-     * @throws AccumuloException
-     * @throws AccumuloSecurityException
-     * @throws PcjException
-     */
-    private List<ExternalTupleSet> getAccIndices() throws Exception {
-
-        Objects.requireNonNull(conf);
-        final String tablePrefix = Objects.requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
-        final Connector conn = Objects.requireNonNull(ConfigUtils.getConnector(conf));
-        List<String> tables = null;
-
-        if (conf instanceof RdfCloudTripleStoreConfiguration) {
-            tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables();
-        }
-        // this maps associates pcj table name with pcj sparql query
-        final Map<String, String> indexTables = Maps.newLinkedHashMap();
-
-        try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) {
-            final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
-
-            final boolean tablesProvided = tables != null && !tables.isEmpty();
-
-            if (tablesProvided) {
-                // if tables provided, associate table name with sparql
-                for (final String table : tables) {
-                    indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
-                }
-            } else if (hasRyaDetails(tablePrefix, conn)) {
-                // If this is a newer install of Rya, and it has PCJ Details, then
-                // use those.
-                final List<String> ids = storage.listPcjs();
-                for (final String id : ids) {
-                    indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
-                }
-            } else {
-                // Otherwise figure it out by scanning tables.
-                final PcjTables pcjTables = new PcjTables();
-                for (final String table : conn.tableOperations().list()) {
-                    if (table.startsWith(tablePrefix + "INDEX")) {
-                        indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
-                    }
-                }
-            }
-        }
-
-        // use table name sparql map (indexTables) to create {@link
-        // AccumuloIndexSet}
-        final List<ExternalTupleSet> index = Lists.newArrayList();
-        if (indexTables.isEmpty()) {
-            log.info("No Index found");
-        } else {
-            for (final String table : indexTables.keySet()) {
-                final String indexSparqlString = indexTables.get(table);
-                index.add(new AccumuloIndexSet(indexSparqlString, conf, table));
-            }
-        }
-
-
-        return index;
-    }
-
-    private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) {
-        final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName);
-        try {
-            detailsRepo.getRyaInstanceDetails();
-            return true;
-        } catch (final RyaDetailsRepositoryException e) {
-            return false;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
index 75b48b4..8067a85 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
@@ -33,6 +33,10 @@ import org.apache.rya.indexing.external.matching.QueryNodeListRater;
 import org.apache.rya.indexing.external.matching.QuerySegment;
 import org.apache.rya.indexing.external.matching.TopOfQueryFilterRelocator;
 import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.Dataset;
 import org.openrdf.query.algebra.QueryModelNode;
@@ -58,29 +62,32 @@ import com.google.common.base.Optional;;
  */
 public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>implements Configurable {
     private static final PCJExternalSetMatcherFactory factory = new PCJExternalSetMatcherFactory();
-    private AccumuloIndexSetProvider provider;
+    private AbstractPcjIndexSetProvider provider;
     private Configuration conf;
     private boolean init = false;
 
     public PCJOptimizer() {}
 
     public PCJOptimizer(final Configuration conf) {
-       setConf(conf);
+        setConf(conf);
     }
 
     /**
      * This constructor is designed to be used for testing.  A more typical use
      * pattern is for a user to specify Accumulo connection details in a Configuration
      * file so that PCJs can be retrieved by an AccumuloIndexSetProvider.
-     * 
-     * @param indices - user specified PCJs to match to query
+     *
+     * @param indices - user specified PCJs to match to query. (not null)
      * @param useOptimalPcj - optimize PCJ combos for matching
+     * @param provider - The provider to use in this optimizer. (not null)
      */
-    public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj) {
+    public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj,
+            final AbstractPcjIndexSetProvider provider) {
         checkNotNull(indices);
+        checkNotNull(provider);
         conf = new Configuration();
-        this.useOptimal = useOptimalPcj;
-        provider = new AccumuloIndexSetProvider(conf, indices);
+        useOptimal = useOptimalPcj;
+        this.provider = provider;
         init = true;
     }
 
@@ -90,9 +97,14 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>
         if (!init) {
             try {
                 this.conf = conf;
-                this.useOptimal = ConfigUtils.getUseOptimalPCJ(conf);
-                provider = new AccumuloIndexSetProvider(conf);
-            } catch (Exception e) {
+                useOptimal = ConfigUtils.getUseOptimalPCJ(conf);
+                if (conf instanceof StatefulMongoDBRdfConfiguration) {
+                    final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf;
+                    provider = new MongoPcjIndexSetProvider(mongoConf);
+                } else {
+                    provider = new AccumuloIndexSetProvider(conf);
+                }
+            } catch (final Exception e) {
                 throw new Error(e);
             }
             init = true;
@@ -124,14 +136,14 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>
             } else {
                 return;
             }
-        } catch (Exception e) {
-            throw new RuntimeException("Could not populate Accumulo Index Cache.");
+        } catch (final Exception e) {
+            throw new RuntimeException("Could not populate Index Cache.", e);
         }
     }
 
 
     @Override
-    protected ExternalSetMatcher<ExternalTupleSet> getMatcher(QuerySegment<ExternalTupleSet> segment) {
+    protected ExternalSetMatcher<ExternalTupleSet> getMatcher(final QuerySegment<ExternalTupleSet> segment) {
         return factory.getMatcher(segment);
     }
 
@@ -141,7 +153,7 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>
     }
 
     @Override
-    protected Optional<QueryNodeListRater> getNodeListRater(QuerySegment<ExternalTupleSet> segment) {
+    protected Optional<QueryNodeListRater> getNodeListRater(final QuerySegment<ExternalTupleSet> segment) {
         return Optional.of(new BasicRater(segment.getOrderedNodes()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
index 1ad03b6..09a2706 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import org.apache.rya.indexing.external.matching.QuerySegment;
 import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
 import org.apache.rya.indexing.pcj.matching.QueryVariableNormalizer.VarCollector;
-
 import org.openrdf.query.algebra.Difference;
 import org.openrdf.query.algebra.EmptySet;
 import org.openrdf.query.algebra.Filter;
@@ -61,7 +60,7 @@ public class PCJOptimizerUtilities {
 	 * @param node - node to be checked for validity
 	 * @return - true if valid and false otherwise
 	 */
-	public static boolean isPCJValid(TupleExpr node) {
+	public static boolean isPCJValid(final TupleExpr node) {
 
 		final ValidQueryVisitor vqv = new ValidQueryVisitor();
 		node.visit(vqv);
@@ -91,16 +90,16 @@ public class PCJOptimizerUtilities {
 	 * @param node - PCJ {@link ExternalTupleSet} index node to be checked for validity
 	 * @return - true if valid and false otherwise
 	 */
-	public static boolean isPCJValid(ExternalTupleSet node) {
+	public static boolean isPCJValid(final ExternalTupleSet node) {
 		return isPCJValid(node.getTupleExpr());
 	}
 
 	public static List<ExternalTupleSet> getValidPCJs(
-			List<ExternalTupleSet> pcjs) {
+			final List<ExternalTupleSet> pcjs) {
 
-		Iterator<ExternalTupleSet> iterator = pcjs.iterator();
+		final Iterator<ExternalTupleSet> iterator = pcjs.iterator();
 		while (iterator.hasNext()) {
-			ExternalTupleSet pcj = iterator.next();
+			final ExternalTupleSet pcj = iterator.next();
 			if (!isPCJValid(pcj)) {
 				iterator.remove();
 			}
@@ -109,8 +108,8 @@ public class PCJOptimizerUtilities {
 	}
 
 
-	public static Projection getProjection(TupleExpr te) {
-		ProjectionVisitor visitor = new ProjectionVisitor();
+	public static Projection getProjection(final TupleExpr te) {
+		final ProjectionVisitor visitor = new ProjectionVisitor();
 		te.visit(visitor);
 		return visitor.node;
 	}
@@ -120,7 +119,7 @@ public class PCJOptimizerUtilities {
 		Projection node = null;
 
 		@Override
-		public void meet(Projection node) {
+		public void meet(final Projection node) {
 			this.node = node;
 		}
 	}
@@ -131,13 +130,13 @@ public class PCJOptimizerUtilities {
 	 *            - filters to be pushed down into next {@link QuerySegment}, or
 	 *            as far down as binding variable names permit.
 	 */
-	public static void relocateFilters(Set<Filter> filters) {
-		for (Filter filter : filters) {
+	public static void relocateFilters(final Set<Filter> filters) {
+		for (final Filter filter : filters) {
 			FilterRelocator.relocate(filter);
 		}
 	}
 
-	private static Set<String> getVarNames(Collection<QueryModelNode> nodes) {
+	private static Set<String> getVarNames(final Collection<QueryModelNode> nodes) {
 		List<String> tempVars;
 		final Set<String> nodeVarNames = Sets.newHashSet();
 
@@ -159,8 +158,8 @@ public class PCJOptimizerUtilities {
 			QueryModelVisitorBase<RuntimeException> {
 
 		private boolean isValid = true;
-		private Set<QueryModelNode> filterSet = Sets.newHashSet();
-		private Set<QueryModelNode> spSet = Sets.newHashSet();
+		private final Set<QueryModelNode> filterSet = Sets.newHashSet();
+		private final Set<QueryModelNode> spSet = Sets.newHashSet();
 		private int joinCount = 0;
 
 		public Set<QueryModelNode> getFilters() {
@@ -180,35 +179,35 @@ public class PCJOptimizerUtilities {
 		}
 
 		@Override
-		public void meet(Projection node) {
+		public void meet(final Projection node) {
 			node.getArg().visit(this);
 		}
 
 		@Override
-		public void meet(Filter node) {
+		public void meet(final Filter node) {
 			filterSet.add(node.getCondition());
 			node.getArg().visit(this);
 		}
 
 		@Override
-		public void meet(StatementPattern node) {
+		public void meet(final StatementPattern node) {
 			spSet.add(node);
 		}
 
 		@Override
-		public void meet(Join node) {
+		public void meet(final Join node) {
 			joinCount++;
 			super.meet(node);
 		}
 
 		@Override
-		public void meet(LeftJoin node) {
+		public void meet(final LeftJoin node) {
 			joinCount++;
 			super.meet(node);
 		}
 
 		@Override
-		public void meetNode(QueryModelNode node) {
+		public void meetNode(final QueryModelNode node) {
 			if (!(node instanceof Join || node instanceof LeftJoin
 					|| node instanceof StatementPattern || node instanceof Var
 					|| node instanceof Union || node instanceof Filter || node instanceof Projection)) {
@@ -238,18 +237,18 @@ public class PCJOptimizerUtilities {
 		protected Filter filter;
 		protected Set<String> filterVars;
 
-		public FilterRelocator(Filter filter) {
+		public FilterRelocator(final Filter filter) {
 			this.filter = filter;
 			filterVars = VarNameCollector.process(filter.getCondition());
 		}
 
-		public static void relocate(Filter filter) {
+		public static void relocate(final Filter filter) {
 			final FilterRelocator fr = new FilterRelocator(filter);
 			filter.visit(fr);
 		}
 
 		@Override
-		protected void meetNode(QueryModelNode node) {
+		protected void meetNode(final QueryModelNode node) {
 			// By default, do not traverse
 			assert node instanceof TupleExpr;
 
@@ -263,7 +262,7 @@ public class PCJOptimizerUtilities {
 		}
 
 		@Override
-		public void meet(Join join) {
+		public void meet(final Join join) {
 			if (join.getRightArg().getBindingNames().containsAll(filterVars)) {
 				// All required vars are bound by the left expr
 				join.getRightArg().visit(this);
@@ -277,12 +276,12 @@ public class PCJOptimizerUtilities {
 		}
 
 		@Override
-		public void meet(Filter node) {
+		public void meet(final Filter node) {
 			node.getArg().visit(this);
 		}
 
 		@Override
-		public void meet(LeftJoin leftJoin) {
+		public void meet(final LeftJoin leftJoin) {
 			if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) {
 				leftJoin.getLeftArg().visit(this);
 			} else {
@@ -291,7 +290,7 @@ public class PCJOptimizerUtilities {
 		}
 
 		@Override
-        public void meet(Union union) {
+        public void meet(final Union union) {
             boolean filterMoved = false;
             if (Sets.intersection(union.getRightArg().getBindingNames(), filterVars).size() > 0) {
                 relocate(filter, union.getRightArg());
@@ -300,7 +299,7 @@ public class PCJOptimizerUtilities {
  
             if (Sets.intersection(union.getLeftArg().getBindingNames(), filterVars).size() > 0) {
                 if (filterMoved) {
-                    Filter clone = new Filter(filter.getArg(), filter.getCondition().clone());
+                    final Filter clone = new Filter(filter.getArg(), filter.getCondition().clone());
                     relocate(clone, union.getLeftArg());
                 } else {
                     relocate(filter, union.getLeftArg());
@@ -309,36 +308,36 @@ public class PCJOptimizerUtilities {
         }
 
 		@Override
-		public void meet(Difference node) {
+		public void meet(final Difference node) {
 			if (Sets.intersection(node.getRightArg().getBindingNames(), filterVars).size() > 0) {
 				relocate(filter, node.getRightArg());
 			} else if (Sets.intersection(node.getLeftArg().getBindingNames(), filterVars).size() > 0) {
-				Filter clone = new Filter(filter.getArg(), filter
+				final Filter clone = new Filter(filter.getArg(), filter
 						.getCondition().clone());
 				relocate(clone, node.getLeftArg());
 			}
 		}
 
 		@Override
-		public void meet(Intersection node) {
+		public void meet(final Intersection node) {
 			if (Sets.intersection(node.getRightArg().getBindingNames(), filterVars).size() > 0) {
 				relocate(filter, node.getRightArg());
 			} else if (Sets.intersection(node.getLeftArg().getBindingNames(), filterVars).size() > 0) {
-				Filter clone = new Filter(filter.getArg(), filter
+				final Filter clone = new Filter(filter.getArg(), filter
 						.getCondition().clone());
 				relocate(clone, node.getLeftArg());
 			}
 		}
 
 		@Override
-		public void meet(EmptySet node) {
+		public void meet(final EmptySet node) {
 			if (filter.getParentNode() != null) {
 				// Remove filter from its original location
 				filter.replaceWith(filter.getArg());
 			}
 		}
 
-		protected void relocate(Filter filter, TupleExpr newFilterArg) {
+		protected void relocate(final Filter filter, final TupleExpr newFilterArg) {
 			if (!filter.getArg().equals(newFilterArg)) {
 				if (filter.getParentNode() != null) {
 					// Remove filter from its original location
@@ -351,10 +350,8 @@ public class PCJOptimizerUtilities {
 		}
 	}
 
-
-
-	public static boolean tupleContainsLeftJoins(TupleExpr node) {
-	    LeftJoinVisitor lj = new LeftJoinVisitor();
+	public static boolean tupleContainsLeftJoins(final TupleExpr node) {
+	    final LeftJoinVisitor lj = new LeftJoinVisitor();
 	    node.visit(lj);
         return lj.containsLeftJoin;
     }
@@ -368,18 +365,8 @@ public class PCJOptimizerUtilities {
         }
 
         @Override
-        public void meet(LeftJoin node) {
+        public void meet(final LeftJoin node) {
             containsLeftJoin = true;
         }
     }
-
-
-
-
-
-
-
-
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java
new file mode 100644
index 0000000..984153a
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.matching.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
+import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
+import org.apache.rya.indexing.external.matching.ExternalSetProvider;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
+import org.apache.rya.indexing.pcj.matching.PCJToSegmentConverter;
+import org.openrdf.query.algebra.TupleExpr;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Abstraction of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s.
+ * Implementations of this use either a user specified configuration information or user a specified
+ * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets.  If a configuration
+ * is provided, the provider connects to an instance of RyaDetails and populates the cache with
+ * PCJs registered in RyaDetails.
+ */
+public abstract class AbstractPcjIndexSetProvider implements ExternalSetProvider<ExternalTupleSet> {
+    protected static final Logger log = Logger.getLogger(AbstractPcjIndexSetProvider.class);
+    protected static final PCJToSegmentConverter converter = new PCJToSegmentConverter();
+    protected List<ExternalTupleSet> indexCache;
+    protected final Configuration conf;
+    protected boolean init = false;
+
+    /**
+     * Creates a new {@link AbstractPcjIndexSetProvider} based on configuration only.
+     * @param conf - The {@link Configuration} used to connect to {@link RyaDetails}.
+     */
+    public AbstractPcjIndexSetProvider(final Configuration conf) {
+        this.conf = requireNonNull(conf);
+    }
+
+    /**
+     * Creates a new {@link AbstractPcjIndexSetProvider} based user provided {@link ExternalTupleSet}s.
+     * @param conf - The {@link Configuration} used to connect to {@link RyaDetails}.
+     * @param indices - The {@link ExternalTupleSet}s to populate the internal cache.
+     */
+    public AbstractPcjIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
+        requireNonNull(conf);
+        this.conf = conf;
+        indexCache = indices;
+        init = true;
+    }
+
+
+    /**
+     *
+     * @param indices
+     */
+    @VisibleForTesting
+    public void setIndices(final List<ExternalTupleSet> indices) {
+        indexCache = indices;
+        init = true;
+    }
+
+    /**
+     * @param segment - QuerySegment used to get relevant queries form index cache for matching
+     *
+     * @return Iterator of Lists (combos) of PCJs used to build an optimal query plan
+     */
+    @Override
+    public Iterator<List<ExternalTupleSet>> getExternalSetCombos(final QuerySegment<ExternalTupleSet> segment) {
+        final ValidIndexCombinationGenerator comboGen = new ValidIndexCombinationGenerator(segment.getOrderedNodes());
+        return comboGen.getValidIndexCombos(getExternalSets(segment));
+    }
+
+    /**
+     * @param segment - QuerySegment used to get relevant queries form index cache for matching
+     * @return List of PCJs for matching
+     */
+    @Override
+    public List<ExternalTupleSet> getExternalSets(final QuerySegment<ExternalTupleSet> segment) {
+        try {
+            if(!init) {
+                indexCache = PCJOptimizerUtilities.getValidPCJs(getIndices());
+                init = true;
+            }
+            final TupleExpr query = segment.getQuery().getTupleExpr();
+            final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(query, indexCache);
+            final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices();
+            final List<ExternalTupleSet> tuples = new ArrayList<>();
+            for (final ExternalTupleSet tuple: pcjs) {
+                final QuerySegment<ExternalTupleSet> pcj = converter.setToSegment(tuple);
+                if (segment.containsQuerySegment(pcj)) {
+                    tuples.add(tuple);
+                }
+            }
+            return tuples;
+
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @return The size of the set index cache.
+     * @throws Exception
+     */
+    public int size() throws Exception {
+        if (!init) {
+            indexCache = PCJOptimizerUtilities.getValidPCJs(getIndices());
+            init = true;
+        }
+        return indexCache.size();
+    }
+
+    /**
+     * @param conf - client configuration
+     * @return - list of {@link ExternalTupleSet}s or PCJs that are either
+     *         specified by user in Configuration or exist in system.
+     *
+     * @throws Exception
+     */
+    protected abstract List<ExternalTupleSet> getIndices() throws PcjIndexSetException;
+
+    /**
+     * Exception thrown when failing to get the defined PCJS for a particular
+     * index.
+     */
+    public class PcjIndexSetException extends Exception {
+        public PcjIndexSetException(final String message) {
+            super(message);
+        }
+
+        public PcjIndexSetException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java
new file mode 100644
index 0000000..1fa3677
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.matching.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of {@link AbstractPcjIndexSetProvider} for Accumulo.
+ * This provider uses either user specified Accumulo configuration information or user a specified
+ * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets.  If Accumulo configuration
+ * is provided, the provider connects to an instance of RyaDetails and populates the cache with
+ * PCJs registered in RyaDetails.
+ */
+public class AccumuloIndexSetProvider extends AbstractPcjIndexSetProvider {
+    private static final Logger log = Logger.getLogger(AccumuloIndexSetProvider.class);
+
+    public AccumuloIndexSetProvider(final Configuration conf) {
+        super(conf);
+    }
+
+    public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
+        super(conf, indices);
+    }
+
+    @Override
+    protected List<ExternalTupleSet> getIndices() throws PcjIndexSetException {
+        requireNonNull(conf);
+        try {
+            final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
+            final Connector conn = requireNonNull(ConfigUtils.getConnector(conf));
+            List<String> tables = null;
+
+            if (conf instanceof RdfCloudTripleStoreConfiguration) {
+                tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables();
+            }
+            // this maps associates pcj table name with pcj sparql query
+            final Map<String, String> indexTables = Maps.newLinkedHashMap();
+
+            try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) {
+                final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
+
+                final boolean tablesProvided = tables != null && !tables.isEmpty();
+
+                if (tablesProvided) {
+                    // if tables provided, associate table name with sparql
+                    for (final String table : tables) {
+                        indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
+                    }
+                } else if (hasRyaDetails(tablePrefix, conn)) {
+                    // If this is a newer install of Rya, and it has PCJ Details, then
+                    // use those.
+                    final List<String> ids = storage.listPcjs();
+                    for (final String id : ids) {
+                        indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
+                    }
+                } else {
+                    // Otherwise figure it out by scanning tables.
+                    final PcjTables pcjTables = new PcjTables();
+                    for (final String table : conn.tableOperations().list()) {
+                        if (table.startsWith(tablePrefix + "INDEX")) {
+                            indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
+                        }
+                    }
+                }
+            }
+
+            // use table name sparql map (indexTables) to create {@link
+            // AccumuloIndexSet}
+            final List<ExternalTupleSet> index = Lists.newArrayList();
+            if (indexTables.isEmpty()) {
+                log.info("No Index found");
+            } else {
+                for (final String table : indexTables.keySet()) {
+                    final String indexSparqlString = indexTables.get(table);
+                    index.add(new AccumuloIndexSet(indexSparqlString, conf, table));
+                }
+            }
+
+            return index;
+        } catch (final PCJStorageException | AccumuloException | AccumuloSecurityException | MalformedQueryException
+                | SailException | QueryEvaluationException | TableNotFoundException e) {
+            throw new PcjIndexSetException("Failed to retrieve the indicies.", e);
+        }
+    }
+
+    private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) {
+        final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName);
+        try {
+            detailsRepo.getRyaInstanceDetails();
+            return true;
+        } catch (final RyaDetailsRepositoryException e) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index 5028454..78b4f52 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -107,7 +107,7 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Run the test.
-            ryaClient.getBatchUpdatePCJ().get().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+            ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
 
             // Verify the correct results were loaded into the PCJ table.
             final Set<BindingSet> expectedResults = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
index da717d7..a8c7455 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
@@ -23,10 +23,9 @@ import static org.junit.Assert.assertTrue;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.rya.accumulo.AccumuloITBase;
-import org.apache.rya.api.client.Install;
 import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
 import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.RyaClientException;
 import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
 import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
@@ -41,6 +40,14 @@ public class AccumuloInstallIT extends AccumuloITBase {
     public void install() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException {
         // Install an instance of Rya.
         final String instanceName = getRyaInstanceName();
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+        		getUsername(),
+        		getPassword().toCharArray(),
+        		getInstanceName(),
+        		getZookeepers());
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+        
         final InstallConfiguration installConfig = InstallConfiguration.builder()
                 .setEnableTableHashPrefix(false)
                 .setEnableEntityCentricIndex(false)
@@ -48,39 +55,58 @@ public class AccumuloInstallIT extends AccumuloITBase {
                 .setEnableTemporalIndex(false)
                 .setEnablePcjIndex(false)
                 .setEnableGeoIndex(false)
-                .setFluoPcjAppName("fluo_app_name")
                 .build();
 
+
+        ryaClient.getInstall().install(instanceName, installConfig);
+
+        // Check that the instance exists.
+        assertTrue( ryaClient.getInstanceExists().exists(instanceName) );
+    }
+
+    @Test
+    public void install_withIndexers() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException {
+        // Install an instance of Rya.
+        final String instanceName = getRyaInstanceName();
         final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
-                getUsername(),
-                getPassword().toCharArray(),
-                getInstanceName(),
-                getZookeepers());
+        		getUsername(),
+        		getPassword().toCharArray(),
+        		getInstanceName(),
+        		getZookeepers());
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+        
+        final InstallConfiguration installConfig = InstallConfiguration.builder()
+                .setEnableTableHashPrefix(true)
+                .setEnableEntityCentricIndex(true)
+                .setEnableFreeTextIndex(true)
+                .setEnableTemporalIndex(true)
+                .setEnablePcjIndex(true)
+                .setEnableGeoIndex(true)
+                .build();
 
-        final Install install = new AccumuloInstall(connectionDetails, getConnector());
-        install.install(instanceName, installConfig);
+        ryaClient.getInstall().install(instanceName, installConfig);
 
         // Check that the instance exists.
-        final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector());
-        assertTrue( instanceExists.exists(instanceName) );
+        assertTrue( ryaClient.getInstanceExists().exists(instanceName) );
     }
 
     @Test(expected = DuplicateInstanceNameException.class)
     public void install_alreadyExists() throws DuplicateInstanceNameException, RyaClientException, AccumuloException, AccumuloSecurityException {
         // Install an instance of Rya.
         final String instanceName = getRyaInstanceName();
-        final InstallConfiguration installConfig = InstallConfiguration.builder().build();
-
         final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
-                getUsername(),
-                getPassword().toCharArray(),
-                getInstanceName(),
-                getZookeepers());
+        		getUsername(),
+        		getPassword().toCharArray(),
+        		getInstanceName(),
+        		getZookeepers());
 
-        final Install install = new AccumuloInstall(connectionDetails, getConnector());
-        install.install(instanceName, installConfig);
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+        
+        final InstallConfiguration installConfig = InstallConfiguration.builder().build();
+        ryaClient.getInstall().install(instanceName, installConfig);
 
         // Install it again.
-        install.install(instanceName, installConfig);
+        ryaClient.getInstall().install(instanceName, installConfig);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
index c4e4d54..e88e35c 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
@@ -22,11 +22,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.rya.accumulo.AccumuloITBase;
-import org.apache.rya.api.client.Install;
 import org.apache.rya.api.client.Install.InstallConfiguration;
 import org.apache.rya.api.client.InstanceDoesNotExistException;
-import org.apache.rya.api.client.InstanceExists;
-import org.apache.rya.api.client.Uninstall;
+import org.apache.rya.api.client.RyaClient;
 import org.junit.Test;
 
 /**
@@ -62,20 +60,16 @@ public class AccumuloUninstallIT extends AccumuloITBase {
                 getPassword().toCharArray(),
                 getInstanceName(),
                 getZookeepers());
-
-        final Install install = new AccumuloInstall(connectionDetails, getConnector());
-        final String ryaInstanceName = getRyaInstanceName();
-        install.install(ryaInstanceName, installConfig);
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+        ryaClient.getInstall().install(getRyaInstanceName(), installConfig);
 
         // Check that the instance exists.
-        final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector());
-        assertTrue( instanceExists.exists(ryaInstanceName) );
+        assertTrue( ryaClient.getInstanceExists().exists(getRyaInstanceName()) );
 
         // Uninstall the instance of Rya.
-        final Uninstall uninstall = new AccumuloUninstall(connectionDetails, getConnector());
-        uninstall.uninstall(ryaInstanceName);
+        ryaClient.getUninstall().uninstall(getRyaInstanceName());
 
         // Verify that it no longer exists.
-        assertFalse( instanceExists.exists(ryaInstanceName) );
+        assertFalse( ryaClient.getInstanceExists().exists(getRyaInstanceName()) );
     }
 }
\ No newline at end of file